Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

fix: Delete by cq_id before insertion #266

Merged
merged 3 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions database/postgres/pgdatabase.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,17 @@ func NewPgDatabase(ctx context.Context, logger hclog.Logger, dsn string, sd sche
var _ execution.Storage = (*PgDatabase)(nil)

// Insert inserts all resources to given table, table and resources are assumed from same table.
func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schema.Resources) error {
func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
if len(resources) == 0 {
return nil
}

if shouldCascade {
if err := deleteResourceByCQId(ctx, p.pool, resources, cascadeDeleteFilters); err != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative we could enable ON CONFLICT UPDATE, but that checks PK collisions (which we want to keep/see/reported) and not necessarily a cq_id collision.

Or running this + the insert below in a TX might be a (costly?) option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one of the above needs to be the solution, due to the fact that when data is deleted during fetch, can cause bad user-experience, even if it is then re-inserted again anyway (@yevgenypats @roneli @bbernays).

Not sure which we should choose between TX and ON CONFLICT UPDATE. One thing idea in favor of using deleteResourceByCQId is just consistency with current behaviour in copyFrom. i.e., if we delete by cq-id in copyFrom, we should do the same kind of deletions in the fallback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I don't think I have ever seen an issue that stems from DB performance on insert. The only incident that I am aware of is that someone had issues because they needed to vacuum their db. So overall, I am not very concerned about DB performance as our biggest bottleneck is by far and away the rate we can grab data from provider APIs...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, I like the idea and consistency of doing the deletion + insertion in a TX for each of our 3 methods of insertion (copy, insert multiple and singular insert)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 0264392

return err
}
}

// It is safe to assume that all resources have the same columns
cols := quoteColumns(resources.ColumnNames())
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
Expand Down Expand Up @@ -104,16 +111,7 @@ func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, sh
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
if shouldCascade {
q := goqu.Dialect("postgres").Delete(resources.TableName()).Where(goqu.Ex{"cq_id": resources.GetIds()})
for k, v := range cascadeDeleteFilters {
q = q.Where(goqu.Ex{k: goqu.Op{"eq": v}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return err
}
_, err = tx.Exec(ctx, sql, args...)
if err != nil {
if err := deleteResourceByCQId(ctx, tx, resources, cascadeDeleteFilters); err != nil {
return err
}
}
Expand Down Expand Up @@ -221,3 +219,20 @@ func quoteColumns(columns []string) []string {
}
return ret
}

type execer interface {
disq marked this conversation as resolved.
Show resolved Hide resolved
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
}

func deleteResourceByCQId(ctx context.Context, db execer, resources schema.Resources, cascadeDeleteFilters map[string]interface{}) error {
q := goqu.Dialect("postgres").Delete(resources.TableName()).Where(goqu.Ex{"cq_id": resources.GetIds()})
for k, v := range cascadeDeleteFilters {
q = q.Where(goqu.Ex{k: goqu.Op{"eq": v}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return err
}
_, err = db.Exec(ctx, sql, args...)
return err
}
4 changes: 2 additions & 2 deletions provider/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resou
e.Logger.Warn("failed copy-from to db", "error", err)

// fallback insert, copy from sometimes does problems, so we fall back with bulk insert
err = e.Db.Insert(ctx, e.Table, resources)
err = e.Db.Insert(ctx, e.Table, resources, shouldCascade, e.extraFields)
if err == nil {
return resources, nil
}
Expand All @@ -355,7 +355,7 @@ func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resou
// Try to insert resource by resource if partial fetch is enabled and an error occurred
partialFetchResources := make(schema.Resources, 0)
for id := range resources {
if err := e.Db.Insert(ctx, e.Table, schema.Resources{resources[id]}); err != nil {
if err := e.Db.Insert(ctx, e.Table, schema.Resources{resources[id]}, shouldCascade, e.extraFields); err != nil {
e.Logger.Error("failed to insert resource into db", "error", err, "resource_keys", resources[id].PrimaryKeyValues())
diags = diags.Add(ClassifyError(err, diag.WithType(diag.DATABASE)))
continue
Expand Down
6 changes: 3 additions & 3 deletions provider/execution/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ func (_m *DatabaseMock) Exec(ctx context.Context, query string, args ...interfac
}

// Insert provides a mock function with given fields: ctx, t, instance
func (_m *DatabaseMock) Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error {
func (_m *DatabaseMock) Insert(ctx context.Context, t *schema.Table, instance schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
ret := _m.Called(ctx, t, instance)

var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *schema.Table, schema.Resources) error); ok {
r0 = rf(ctx, t, instance)
if rf, ok := ret.Get(0).(func(context.Context, *schema.Table, schema.Resources, bool, map[string]interface{}) error); ok {
r0 = rf(ctx, t, instance, shouldCascade, cascadeDeleteFilters)
} else {
r0 = ret.Error(0)
}
Expand Down
4 changes: 2 additions & 2 deletions provider/execution/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
type Storage interface {
QueryExecer
Copier
Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error
Insert(ctx context.Context, t *schema.Table, instance schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error
Delete(ctx context.Context, t *schema.Table, kvFilters []interface{}) error
RemoveStaleData(ctx context.Context, t *schema.Table, executionStart time.Time, kvFilters []interface{}) error
CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, CascadeDeleteFilters map[string]interface{}) error
CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error
Close()
Dialect() schema.Dialect
}
Expand Down
4 changes: 2 additions & 2 deletions provider/execution/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (f noopStorage) Exec(ctx context.Context, query string, args ...interface{}
return nil
}

func (f noopStorage) Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error {
func (f noopStorage) Insert(ctx context.Context, t *schema.Table, instance schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
return nil
}

Expand All @@ -33,7 +33,7 @@ func (f noopStorage) RemoveStaleData(ctx context.Context, t *schema.Table, execu
return nil
}

func (f noopStorage) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, CascadeDeleteFilters map[string]interface{}) error {
func (f noopStorage) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions provider/schema/mock/mock_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.