diff --git a/ksql/client/client.go b/ksql/client/client.go index 987282d..a5d1318 100644 --- a/ksql/client/client.go +++ b/ksql/client/client.go @@ -42,7 +42,7 @@ func (c *Client) RotateCredentials(url, username, password string) { } } -func (c *Client) ExecuteQuery(ctx context.Context, name, qType, query string) (string, error) { +func (c *Client) ExecuteQuery(ctx context.Context, name, qType, query string, ignoreAlreadyExists bool) (string, error) { var ( err error res Response @@ -64,10 +64,13 @@ func (c *Client) ExecuteQuery(ctx context.Context, name, qType, query string) (s } if res.ErrorCode != 0 { + if ignoreAlreadyExists && strings.Contains(res.Message, "already exists") { + break + } err = fmt.Errorf("invalid ksql response %s", res.Message) if strings.HasPrefix(query, "DROP") { if terminateQuery, shouldTerminate := c.getPreHookTerminateQuery(res.Message); shouldTerminate { - _, err = c.ExecuteQuery(ctx, name, qType, terminateQuery) + _, err = c.ExecuteQuery(ctx, name, qType, terminateQuery, ignoreAlreadyExists) } } tflog.Warn(ctx, fmt.Sprintf("failed to make post ksql request [%v] retrying...", err)) diff --git a/ksql/resource_ksql_query.go b/ksql/resource_ksql_query.go index 321ee6f..68a5105 100644 --- a/ksql/resource_ksql_query.go +++ b/ksql/resource_ksql_query.go @@ -44,6 +44,13 @@ func resourceQuery() *schema.Resource { ForceNew: true, Default: false, }, + "ignore_already_exists": { + Description: "Ignore already exists errors.", + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + Default: false, + }, "credentials": { Type: schema.TypeList, Optional: true, @@ -96,13 +103,14 @@ func resourceQueryCreate(ctx context.Context, d *schema.ResourceData, m interfac ) var ( - diags diag.Diagnostics - name = d.Get("name").(string) - qType = d.Get("type").(string) - query = d.Get("query").(string) + diags diag.Diagnostics + name = d.Get("name").(string) + qType = d.Get("type").(string) + query = d.Get("query").(string) + ignoreAlreadyExists = d.Get("ignore_already_exists").(bool) ) - id, err := cli.ExecuteQuery(context.Background(), name, qType, query) + id, err := cli.ExecuteQuery(context.Background(), name, qType, query, ignoreAlreadyExists) if err != nil { return diag.FromErr(err) } @@ -142,6 +150,7 @@ func resourceQueryDelete(ctx context.Context, d *schema.ResourceData, m interfac queryType = d.Get("type").(string) queryName = d.Get("name").(string) deleteTopicOnDestroy = d.Get("delete_topic_on_destroy").(bool) + ignoreAlreadyExists = d.Get("ignore_already_exists").(bool) ) buf := &bytes.Buffer{} @@ -154,7 +163,7 @@ func resourceQueryDelete(ctx context.Context, d *schema.ResourceData, m interfac } buf.WriteString(" ;") - _, err := cli.ExecuteQuery(ctx, queryName, queryType, buf.String()) + _, err := cli.ExecuteQuery(ctx, queryName, queryType, buf.String(), ignoreAlreadyExists) if err != nil { return diag.Errorf("failed to drop ksql resource %q: %s", d.Id(), err) }