From 0fbe6da7b4fd366498d2eb2f33b11f70411af83e Mon Sep 17 00:00:00 2001 From: Kristof Martens Date: Thu, 3 Nov 2022 12:58:28 +0100 Subject: [PATCH 1/6] Add the option to preservce the source data type for S3 output format config --- internal/service/appflow/flow.go | 16 ++++++++++++++++ website/docs/r/appflow_flow.html.markdown | 1 + 2 files changed, 17 insertions(+) diff --git a/internal/service/appflow/flow.go b/internal/service/appflow/flow.go index a5817b48d951..e79ea66cbf97 100644 --- a/internal/service/appflow/flow.go +++ b/internal/service/appflow/flow.go @@ -378,6 +378,10 @@ func ResourceFlow() *schema.Resource { }, }, }, + "preserve_source_data_typing": { + Type: schema.TypeBool, + Optional: true, + }, }, }, }, @@ -610,6 +614,10 @@ func ResourceFlow() *schema.Resource { }, }, }, + "preserve_source_data_typing": { + Type: schema.TypeBool, + Optional: true, + }, }, }, }, @@ -1720,6 +1728,10 @@ func expandS3OutputFormatConfig(tfMap map[string]interface{}) *appflow.S3OutputF a.PrefixConfig = expandPrefixConfig(v[0].(map[string]interface{})) } + if v, ok := tfMap["preserve_source_data_typing"].(bool); ok { + a.PreserveSourceDataTyping = aws.Bool(v) + } + return a } @@ -2824,6 +2836,10 @@ func flattenS3OutputFormatConfig(s3OutputFormatConfig *appflow.S3OutputFormatCon m["prefix_config"] = []interface{}{flattenPrefixConfig(v)} } + if v := s3OutputFormatConfig.PreserveSourceDataTyping; v != nil { + m["preserve_source_data_typing"] = aws.BoolValue(v) + } + return m } diff --git a/website/docs/r/appflow_flow.html.markdown b/website/docs/r/appflow_flow.html.markdown index 667f38ba8608..b5460a69d53d 100644 --- a/website/docs/r/appflow_flow.html.markdown +++ b/website/docs/r/appflow_flow.html.markdown @@ -202,6 +202,7 @@ EventBridge, Honeycode, and Marketo destination properties all support the follo * `aggregation_config` - (Optional) Aggregation settings that you can use to customize the output format of your flow data. See [Aggregation Config](#aggregation-config) for more details. * `file_type` - (Optional) File type that Amazon AppFlow places in the Amazon S3 bucket. Valid values are `CSV`, `JSON`, and `PARQUET`. * `prefix_config` - (Optional) Determines the prefix that Amazon AppFlow applies to the folder name in the Amazon S3 bucket. You can name folders according to the flow frequency and date. See [Prefix Config](#prefix-config) for more details. +* `preserve_source_data_typing` - (Optional, Boolean) Whether the data types from the source system need to be preserved (Only valid for `Parquet` file type) ##### Salesforce Destination Properties From 81f235c2ab7dc01d5edfce62c55f4b0ca55d535f Mon Sep 17 00:00:00 2001 From: Kristof Martens Date: Thu, 3 Nov 2022 13:31:53 +0100 Subject: [PATCH 2/6] Updating Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31324b17ac57..ffb568c73097 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ ENHANCEMENTS: * resource/aws_network_interface_attachment: Added import capabilities for the resource ([#27364](https://github.com/hashicorp/terraform-provider-aws/issues/27364)) * resource/aws_sesv2_dedicated_ip_pool: Add `scaling_mode` attribute ([#27388](https://github.com/hashicorp/terraform-provider-aws/issues/27388)) * resource/aws_ssm_parameter: Support `aws:ssm:integration` as a valid value for `data_type` ([#27329](https://github.com/hashicorp/terraform-provider-aws/issues/27329)) +* resource/aws_appflow_flow: Add support for preserving source data types in the S3 output config ([#26372](https://github.com/hashicorp/terraform-provider-aws/issues/26372)) BUG FIXES: From a9c57b4a986836de996e8e0a18a922f5f885ac55 Mon Sep 17 00:00:00 2001 From: Adrian Johnson Date: Fri, 10 Mar 2023 10:59:58 -0600 Subject: [PATCH 3/6] remove CHANGELOG entry --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ffb568c73097..31324b17ac57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,7 +62,6 @@ ENHANCEMENTS: * resource/aws_network_interface_attachment: Added import capabilities for the resource ([#27364](https://github.com/hashicorp/terraform-provider-aws/issues/27364)) * resource/aws_sesv2_dedicated_ip_pool: Add `scaling_mode` attribute ([#27388](https://github.com/hashicorp/terraform-provider-aws/issues/27388)) * resource/aws_ssm_parameter: Support `aws:ssm:integration` as a valid value for `data_type` ([#27329](https://github.com/hashicorp/terraform-provider-aws/issues/27329)) -* resource/aws_appflow_flow: Add support for preserving source data types in the S3 output config ([#26372](https://github.com/hashicorp/terraform-provider-aws/issues/26372)) BUG FIXES: From 46074ce450114c2019db50888dea99213ee0c6d9 Mon Sep 17 00:00:00 2001 From: Adrian Johnson Date: Fri, 10 Mar 2023 11:10:35 -0600 Subject: [PATCH 4/6] r/aws_appflow_flow: remove attribute from upsolver --- internal/service/appflow/flow.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/internal/service/appflow/flow.go b/internal/service/appflow/flow.go index 69b58420fb61..b169c10e4fb0 100644 --- a/internal/service/appflow/flow.go +++ b/internal/service/appflow/flow.go @@ -620,10 +620,6 @@ func ResourceFlow() *schema.Resource { }, }, }, - "preserve_source_data_typing": { - Type: schema.TypeBool, - Optional: true, - }, }, }, }, From 21dbb9444b246ef6dad489d222ed4e617cfab53a Mon Sep 17 00:00:00 2001 From: Adrian Johnson Date: Fri, 10 Mar 2023 11:14:35 -0600 Subject: [PATCH 5/6] add CHANGLOG entry --- .changelog/27616.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/27616.txt diff --git a/.changelog/27616.txt b/.changelog/27616.txt new file mode 100644 index 000000000000..80a02a006dbd --- /dev/null +++ b/.changelog/27616.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_appflow_flow: Add attribute `preserve_source_data_typing` to `s3_output_format_config` in `s3` +``` \ No newline at end of file From 07201e958b0de3b9bad13b674658d376400ac98a Mon Sep 17 00:00:00 2001 From: Adrian Johnson Date: Fri, 10 Mar 2023 11:52:21 -0600 Subject: [PATCH 6/6] r/aws_appflow_flow: add test --- internal/service/appflow/flow_test.go | 117 ++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/internal/service/appflow/flow_test.go b/internal/service/appflow/flow_test.go index 3dccea858c8b..6ac477f43e3e 100644 --- a/internal/service/appflow/flow_test.go +++ b/internal/service/appflow/flow_test.go @@ -68,6 +68,53 @@ func TestAccAppFlowFlow_basic(t *testing.T) { }) } +func TestAccAppFlowFlow_S3_outputFormatConfig_ParquetFileType(t *testing.T) { + ctx := acctest.Context(t) + var flowOutput appflow.FlowDefinition + rSourceName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + rDestinationName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + rFlowName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_appflow_flow.test" + scheduleStartTime := time.Now().UTC().AddDate(0, 0, 1).Format(time.RFC3339) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, appflow.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckFlowDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, "PARQUET", true), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckFlowExists(ctx, resourceName, &flowOutput), + resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.#"), + resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.connector_type"), + resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.destination_connector_properties.#"), + resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.preserve_source_data_typing", "true"), + resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.file_type", "PARQUET"), + resource.TestCheckResourceAttrSet(resourceName, "task.#"), + resource.TestCheckResourceAttrSet(resourceName, "task.0.source_fields.#"), + resource.TestCheckResourceAttrSet(resourceName, "task.0.task_type"), + ), + }, + { + Config: testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, "PARQUET", false), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckFlowExists(ctx, resourceName, &flowOutput), + resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.#"), + resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.connector_type"), + resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.destination_connector_properties.#"), + resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.preserve_source_data_typing", "false"), + resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.file_type", "PARQUET"), + resource.TestCheckResourceAttrSet(resourceName, "task.#"), + resource.TestCheckResourceAttrSet(resourceName, "task.0.source_fields.#"), + resource.TestCheckResourceAttrSet(resourceName, "task.0.task_type"), + ), + }, + }, + }) +} + func TestAccAppFlowFlow_update(t *testing.T) { ctx := acctest.Context(t) var flowOutput appflow.FlowDefinition @@ -344,6 +391,76 @@ resource "aws_appflow_flow" "test" { ) } +func testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, fileType string, preserveSourceDataTyping bool) string { + return acctest.ConfigCompose( + testAccFlowConfig_base(rSourceName, rDestinationName), + fmt.Sprintf(` +resource "aws_appflow_flow" "test" { + name = %[1]q + + source_flow_config { + connector_type = "S3" + source_connector_properties { + s3 { + bucket_name = aws_s3_bucket_policy.test_source.bucket + bucket_prefix = "flow" + } + } + } + + destination_flow_config { + connector_type = "S3" + destination_connector_properties { + s3 { + bucket_name = aws_s3_bucket_policy.test_destination.bucket + + s3_output_format_config { + prefix_config { + prefix_type = "PATH" + } + + file_type = %[3]q + preserve_source_data_typing = %[4]t + + aggregation_config { + aggregation_type = "None" + } + } + } + } + } + + task { + source_fields = ["testField"] + destination_field = "testField" + task_type = "Map" + + task_properties = { + "DESTINATION_DATA_TYPE" = "string" + "SOURCE_DATA_TYPE" = "string" + } + + connector_operator { + s3 = "NO_OP" + } + } + + trigger_config { + trigger_type = "Scheduled" + + trigger_properties { + scheduled { + data_pull_mode = "Incremental" + schedule_expression = "rate(3hours)" + schedule_start_time = %[2]q + } + } + } +} +`, rFlowName, scheduleStartTime, fileType, preserveSourceDataTyping), + ) +} + func testAccFlowConfig_update(rSourceName string, rDestinationName string, rFlowName string, description string) string { return acctest.ConfigCompose( testAccFlowConfig_base(rSourceName, rDestinationName),