Skip to content

Commit

Permalink
ignoreAlreadyExists errors flag added
Browse files Browse the repository at this point in the history
  • Loading branch information
serhiimakogon committed Jan 26, 2023
1 parent 66bd364 commit ec4da70
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
7 changes: 5 additions & 2 deletions ksql/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Client) RotateCredentials(url, username, password string) {
}
}

func (c *Client) ExecuteQuery(ctx context.Context, name, qType, query string) (string, error) {
func (c *Client) ExecuteQuery(ctx context.Context, name, qType, query string, ignoreAlreadyExists bool) (string, error) {
var (
err error
res Response
Expand All @@ -64,10 +64,13 @@ func (c *Client) ExecuteQuery(ctx context.Context, name, qType, query string) (s
}

if res.ErrorCode != 0 {
if ignoreAlreadyExists && strings.Contains(res.Message, "already exists") {
break
}
err = fmt.Errorf("invalid ksql response %s", res.Message)
if strings.HasPrefix(query, "DROP") {
if terminateQuery, shouldTerminate := c.getPreHookTerminateQuery(res.Message); shouldTerminate {
_, err = c.ExecuteQuery(ctx, name, qType, terminateQuery)
_, err = c.ExecuteQuery(ctx, name, qType, terminateQuery, ignoreAlreadyExists)
}
}
tflog.Warn(ctx, fmt.Sprintf("failed to make post ksql request [%v] retrying...", err))
Expand Down
21 changes: 15 additions & 6 deletions ksql/resource_ksql_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ func resourceQuery() *schema.Resource {
ForceNew: true,
Default: false,
},
"ignore_already_exists": {
Description: "Ignore already exists errors.",
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Default: false,
},
"credentials": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -96,13 +103,14 @@ func resourceQueryCreate(ctx context.Context, d *schema.ResourceData, m interfac
)

var (
diags diag.Diagnostics
name = d.Get("name").(string)
qType = d.Get("type").(string)
query = d.Get("query").(string)
diags diag.Diagnostics
name = d.Get("name").(string)
qType = d.Get("type").(string)
query = d.Get("query").(string)
ignoreAlreadyExists = d.Get("ignore_already_exists").(bool)
)

id, err := cli.ExecuteQuery(context.Background(), name, qType, query)
id, err := cli.ExecuteQuery(context.Background(), name, qType, query, ignoreAlreadyExists)
if err != nil {
return diag.FromErr(err)
}
Expand Down Expand Up @@ -142,6 +150,7 @@ func resourceQueryDelete(ctx context.Context, d *schema.ResourceData, m interfac
queryType = d.Get("type").(string)
queryName = d.Get("name").(string)
deleteTopicOnDestroy = d.Get("delete_topic_on_destroy").(bool)
ignoreAlreadyExists = d.Get("ignore_already_exists").(bool)
)

buf := &bytes.Buffer{}
Expand All @@ -154,7 +163,7 @@ func resourceQueryDelete(ctx context.Context, d *schema.ResourceData, m interfac
}
buf.WriteString(" ;")

_, err := cli.ExecuteQuery(ctx, queryName, queryType, buf.String())
_, err := cli.ExecuteQuery(ctx, queryName, queryType, buf.String(), ignoreAlreadyExists)
if err != nil {
return diag.Errorf("failed to drop ksql resource %q: %s", d.Id(), err)
}
Expand Down

0 comments on commit ec4da70

Please sign in to comment.