Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rectify inconsistencies in DMS event subscription code and documentation #33731

Merged
merged 10 commits into from
Dec 14, 2023
3 changes: 3 additions & 0 deletions .changelog/33731.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
resource/aws_dms_event_subscription: `source_ids` and `source_type` are Required
```
7 changes: 7 additions & 0 deletions internal/service/dms/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,10 @@ func networkType_Values() []string {
networkTypeIPv4,
}
}

const (
eventSubscriptionStatusActive = "active"
eventSubscriptionStatusCreating = "creating"
eventSubscriptionStatusDeleting = "deleting"
eventSubscriptionStatusModifying = "modifying"
)
235 changes: 141 additions & 94 deletions internal/service/dms/event_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/hashicorp/terraform-provider-aws/internal/errs/sdkdiag"
"github.com/hashicorp/terraform-provider-aws/internal/flex"
tftags "github.com/hashicorp/terraform-provider-aws/internal/tags"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
"github.com/hashicorp/terraform-provider-aws/names"
)
Expand All @@ -33,6 +34,7 @@ func ResourceEventSubscription() *schema.Resource {
ReadWithoutTimeout: resourceEventSubscriptionRead,
UpdateWithoutTimeout: resourceEventSubscriptionUpdate,
DeleteWithoutTimeout: resourceEventSubscriptionDelete,

Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(10 * time.Minute),
Delete: schema.DefaultTimeout(10 * time.Minute),
Expand All @@ -56,7 +58,6 @@ func ResourceEventSubscription() *schema.Resource {
"event_categories": {
Type: schema.TypeSet,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
Required: true,
},
"name": {
Expand All @@ -73,14 +74,12 @@ func ResourceEventSubscription() *schema.Resource {
"source_ids": {
Type: schema.TypeSet,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
Required: true,
ForceNew: true,
Optional: true,
},
"source_type": {
Type: schema.TypeString,
Optional: true,
// The API suppors modification but doing so loses all source_ids
Required: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{
"replication-instance",
Expand All @@ -99,42 +98,27 @@ func resourceEventSubscriptionCreate(ctx context.Context, d *schema.ResourceData
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).DMSConn(ctx)

request := &dms.CreateEventSubscriptionInput{
name := d.Get("name").(string)
input := &dms.CreateEventSubscriptionInput{
Enabled: aws.Bool(d.Get("enabled").(bool)),
EventCategories: flex.ExpandStringSet(d.Get("event_categories").(*schema.Set)),
SnsTopicArn: aws.String(d.Get("sns_topic_arn").(string)),
SubscriptionName: aws.String(d.Get("name").(string)),
SourceIds: flex.ExpandStringSet(d.Get("source_ids").(*schema.Set)),
SourceType: aws.String(d.Get("source_type").(string)),
SubscriptionName: aws.String(name),
Tags: getTagsIn(ctx),
}

if v, ok := d.GetOk("event_categories"); ok {
request.EventCategories = flex.ExpandStringSet(v.(*schema.Set))
}

if v, ok := d.GetOk("source_ids"); ok {
request.SourceIds = flex.ExpandStringSet(v.(*schema.Set))
}

_, err := conn.CreateEventSubscriptionWithContext(ctx, request)
_, err := conn.CreateEventSubscriptionWithContext(ctx, input)

if err != nil {
return sdkdiag.AppendErrorf(diags, "creating DMS Event Subscription (%s): %s", d.Get("name").(string), err)
return sdkdiag.AppendErrorf(diags, "creating DMS Event Subscription (%s): %s", name, err)
}

d.SetId(d.Get("name").(string))

stateConf := &retry.StateChangeConf{
Pending: []string{"creating", "modifying"},
Target: []string{"active"},
Refresh: resourceEventSubscriptionStateRefreshFunc(ctx, conn, d.Id()),
Timeout: d.Timeout(schema.TimeoutCreate),
MinTimeout: 10 * time.Second,
Delay: 10 * time.Second,
}
d.SetId(name)

_, err = stateConf.WaitForStateContext(ctx)
if err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DMS Event Subscription (%s) creation: %s", d.Id(), err)
if _, err := waitEventSubscriptionCreated(ctx, conn, d.Id(), d.Timeout(schema.TimeoutCreate)); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DMS Event Subscription (%s) create: %s", d.Id(), err)
}

return append(diags, resourceEventSubscriptionRead(ctx, d, meta)...)
Expand All @@ -144,30 +128,18 @@ func resourceEventSubscriptionRead(ctx context.Context, d *schema.ResourceData,
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).DMSConn(ctx)

request := &dms.DescribeEventSubscriptionsInput{
SubscriptionName: aws.String(d.Id()),
}

response, err := conn.DescribeEventSubscriptionsWithContext(ctx, request)
subscription, err := FindEventSubscriptionByName(ctx, conn, d.Id())

if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) {
log.Printf("[WARN] DMS event subscription (%s) not found, removing from state", d.Id())
if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] DMS Event Subscription (%s) not found, removing from state", d.Id())
d.SetId("")
return diags
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "reading DMS event subscription: %s", err)
}

if response == nil || len(response.EventSubscriptionsList) == 0 || response.EventSubscriptionsList[0] == nil {
log.Printf("[WARN] DMS event subscription (%s) not found, removing from state", d.Id())
d.SetId("")
return diags
return sdkdiag.AppendErrorf(diags, "reading DMS Event Subscription (%s): %s", d.Id(), err)
}

subscription := response.EventSubscriptionsList[0]

arn := arn.ARN{
Partition: meta.(*conns.AWSClient).Partition,
Service: "dms",
Expand All @@ -176,13 +148,12 @@ func resourceEventSubscriptionRead(ctx context.Context, d *schema.ResourceData,
Resource: fmt.Sprintf("es:%s", d.Id()),
}.String()
d.Set("arn", arn)

d.Set("enabled", subscription.Enabled)
d.Set("event_categories", aws.StringValueSlice(subscription.EventCategoriesList))
d.Set("name", d.Id())
d.Set("sns_topic_arn", subscription.SnsTopicArn)
d.Set("source_ids", aws.StringValueSlice(subscription.SourceIdsList))
d.Set("source_type", subscription.SourceType)
d.Set("name", d.Id())
d.Set("event_categories", flex.FlattenStringList(subscription.EventCategoriesList))
d.Set("source_ids", flex.FlattenStringList(subscription.SourceIdsList))

return diags
}
Expand All @@ -191,36 +162,23 @@ func resourceEventSubscriptionUpdate(ctx context.Context, d *schema.ResourceData
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).DMSConn(ctx)

if d.HasChanges("enabled", "event_categories", "sns_topic_arn", "source_type") {
request := &dms.ModifyEventSubscriptionInput{
if d.HasChangesExcept("tags", "tags_all") {
input := &dms.ModifyEventSubscriptionInput{
Enabled: aws.Bool(d.Get("enabled").(bool)),
EventCategories: flex.ExpandStringSet(d.Get("event_categories").(*schema.Set)),
SnsTopicArn: aws.String(d.Get("sns_topic_arn").(string)),
SubscriptionName: aws.String(d.Get("name").(string)),
SourceType: aws.String(d.Get("source_type").(string)),
SubscriptionName: aws.String(d.Id()),
}

if v, ok := d.GetOk("event_categories"); ok {
request.EventCategories = flex.ExpandStringSet(v.(*schema.Set))
}

_, err := conn.ModifyEventSubscriptionWithContext(ctx, request)
_, err := conn.ModifyEventSubscriptionWithContext(ctx, input)

if err != nil {
return sdkdiag.AppendErrorf(diags, "updating DMS Event Subscription (%s): %s", d.Id(), err)
}

stateConf := &retry.StateChangeConf{
Pending: []string{"modifying"},
Target: []string{"active"},
Refresh: resourceEventSubscriptionStateRefreshFunc(ctx, conn, d.Id()),
Timeout: d.Timeout(schema.TimeoutUpdate),
MinTimeout: 10 * time.Second,
Delay: 10 * time.Second,
return sdkdiag.AppendErrorf(diags, "modifying DMS Event Subscription (%s): %s", d.Id(), err)
}

_, err = stateConf.WaitForStateContext(ctx)
if err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DMS Event Subscription (%s) modification: %s", d.Id(), err)
if _, err := waitEventSubscriptionUpdated(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate)); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DMS Event Subscription (%s) update: %s", d.Id(), err)
}
}

Expand All @@ -231,11 +189,10 @@ func resourceEventSubscriptionDelete(ctx context.Context, d *schema.ResourceData
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).DMSConn(ctx)

request := &dms.DeleteEventSubscriptionInput{
log.Printf("[DEBUG] Deleting DMS Event Subscription: %s", d.Id())
_, err := conn.DeleteEventSubscriptionWithContext(ctx, &dms.DeleteEventSubscriptionInput{
SubscriptionName: aws.String(d.Id()),
}

_, err := conn.DeleteEventSubscriptionWithContext(ctx, request)
})

if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) {
return diags
Expand All @@ -245,41 +202,131 @@ func resourceEventSubscriptionDelete(ctx context.Context, d *schema.ResourceData
return sdkdiag.AppendErrorf(diags, "deleting DMS Event Subscription (%s): %s", d.Id(), err)
}

stateConf := &retry.StateChangeConf{
Pending: []string{"deleting"},
Target: []string{},
Refresh: resourceEventSubscriptionStateRefreshFunc(ctx, conn, d.Id()),
Timeout: d.Timeout(schema.TimeoutDelete),
MinTimeout: 10 * time.Second,
Delay: 10 * time.Second,
if _, err := waitEventSubscriptionDeleted(ctx, conn, d.Id(), d.Timeout(schema.TimeoutDelete)); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DMS Event Subscription (%s) delete: %s", d.Id(), err)
}

_, err = stateConf.WaitForStateContext(ctx)
return diags
}

func FindEventSubscriptionByName(ctx context.Context, conn *dms.DatabaseMigrationService, name string) (*dms.EventSubscription, error) {
input := &dms.DescribeEventSubscriptionsInput{
SubscriptionName: aws.String(name),
}

return findEventSubscription(ctx, conn, input)
}

func findEventSubscription(ctx context.Context, conn *dms.DatabaseMigrationService, input *dms.DescribeEventSubscriptionsInput) (*dms.EventSubscription, error) {
output, err := findEventSubscriptions(ctx, conn, input)

if err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DMS Event Subscription (%s) deletion: %s", d.Id(), err)
return nil, err
}

return diags
return tfresource.AssertSinglePtrResult(output)
}

func resourceEventSubscriptionStateRefreshFunc(ctx context.Context, conn *dms.DatabaseMigrationService, name string) retry.StateRefreshFunc {
func findEventSubscriptions(ctx context.Context, conn *dms.DatabaseMigrationService, input *dms.DescribeEventSubscriptionsInput) ([]*dms.EventSubscription, error) {
var output []*dms.EventSubscription

err := conn.DescribeEventSubscriptionsPagesWithContext(ctx, input, func(page *dms.DescribeEventSubscriptionsOutput, lastPage bool) bool {
if page == nil {
return !lastPage
}

for _, v := range page.EventSubscriptionsList {
if v != nil {
output = append(output, v)
}
}

return !lastPage
})

if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) {
return nil, &retry.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

return output, nil
}

func statusEventSubscription(ctx context.Context, conn *dms.DatabaseMigrationService, name string) retry.StateRefreshFunc {
return func() (interface{}, string, error) {
v, err := conn.DescribeEventSubscriptionsWithContext(ctx, &dms.DescribeEventSubscriptionsInput{
SubscriptionName: aws.String(name),
})
output, err := FindEventSubscriptionByName(ctx, conn, name)

if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) {
if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

if v == nil || len(v.EventSubscriptionsList) == 0 || v.EventSubscriptionsList[0] == nil {
return nil, "", nil
}
return output, aws.StringValue(output.Status), nil
}
}

return v, aws.StringValue(v.EventSubscriptionsList[0].Status), nil
func waitEventSubscriptionCreated(ctx context.Context, conn *dms.DatabaseMigrationService, name string, timeout time.Duration) (*dms.EventSubscription, error) {
stateConf := &retry.StateChangeConf{
Pending: []string{eventSubscriptionStatusCreating, eventSubscriptionStatusModifying},
Target: []string{eventSubscriptionStatusActive},
Refresh: statusEventSubscription(ctx, conn, name),
Timeout: timeout,
MinTimeout: 10 * time.Second,
Delay: 10 * time.Second,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if output, ok := outputRaw.(*dms.EventSubscription); ok {
return output, err
}

return nil, err
}

func waitEventSubscriptionUpdated(ctx context.Context, conn *dms.DatabaseMigrationService, name string, timeout time.Duration) (*dms.EventSubscription, error) {
stateConf := &retry.StateChangeConf{
Pending: []string{eventSubscriptionStatusModifying},
Target: []string{eventSubscriptionStatusActive},
Refresh: statusEventSubscription(ctx, conn, name),
Timeout: timeout,
MinTimeout: 10 * time.Second,
Delay: 10 * time.Second,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if output, ok := outputRaw.(*dms.EventSubscription); ok {
return output, err
}

return nil, err
}

func waitEventSubscriptionDeleted(ctx context.Context, conn *dms.DatabaseMigrationService, name string, timeout time.Duration) (*dms.EventSubscription, error) {
stateConf := &retry.StateChangeConf{
Pending: []string{eventSubscriptionStatusDeleting},
Target: []string{},
Refresh: statusEventSubscription(ctx, conn, name),
Timeout: timeout,
MinTimeout: 10 * time.Second,
Delay: 10 * time.Second,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if output, ok := outputRaw.(*dms.EventSubscription); ok {
return output, err
}

return nil, err
}
Loading