Skip to content

Commit

Permalink
[extension/dbstorage] Add DB Transactions to dbstorage.Batch() method
Browse files Browse the repository at this point in the history
  • Loading branch information
Fiery-Fenix committed Feb 10, 2025
1 parent 1f8c1ee commit c6641c6
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 27 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feat_dbstorage-transaction-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dbstorageextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add DB Transactions to dbstorage.Batch() method as it is expected by Storage API

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37805]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
77 changes: 53 additions & 24 deletions extension/storage/dbstorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type dbStorageClient struct {

func newClient(ctx context.Context, driverName string, db *sql.DB, tableName string) (*dbStorageClient, error) {
createTableSQL := createTable
if driverName == "sqlite" {
if driverName == driverSqlite {
createTableSQL = createTableSqlite
}
var err error
Expand All @@ -59,45 +59,37 @@ func newClient(ctx context.Context, driverName string, db *sql.DB, tableName str

// Get will retrieve data from storage that corresponds to the specified key
func (c *dbStorageClient) Get(ctx context.Context, key string) ([]byte, error) {
rows, err := c.getQuery.QueryContext(ctx, key)
if err != nil {
return nil, err
}
if !rows.Next() {
return nil, nil
}
var result []byte
err = rows.Scan(&result)
if err != nil {
return result, err
}
err = rows.Close()
return result, err
return c.get(ctx, key, nil)
}

// Set will store data. The data can be retrieved using the same key
func (c *dbStorageClient) Set(ctx context.Context, key string, value []byte) error {
_, err := c.setQuery.ExecContext(ctx, key, value, value)
return err
return c.set(ctx, key, value, nil)
}

// Delete will delete data associated with the specified key
func (c *dbStorageClient) Delete(ctx context.Context, key string) error {
_, err := c.deleteQuery.ExecContext(ctx, key)
return err
return c.delete(ctx, key, nil)
}

// Batch executes the specified operations in order. Get operation results are updated in place
func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation) error {
var err error
// Start a new transaction
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return err
}
//nolint:errcheck
defer tx.Rollback()

for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value, err = c.Get(ctx, op.Key)
op.Value, err = c.get(ctx, op.Key, tx)
case storage.Set:
err = c.Set(ctx, op.Key, op.Value)
err = c.set(ctx, op.Key, op.Value, tx)
case storage.Delete:
err = c.Delete(ctx, op.Key)
err = c.delete(ctx, op.Key, tx)
default:
return errors.New("wrong operation type")
}
Expand All @@ -106,7 +98,8 @@ func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation)
return err
}
}
return err

return tx.Commit()
}

// Close will close the database
Expand All @@ -119,3 +112,39 @@ func (c *dbStorageClient) Close(_ context.Context) error {
}
return c.getQuery.Close()
}

func (c *dbStorageClient) get(ctx context.Context, key string, tx *sql.Tx) ([]byte, error) {
rows, err := c.wrapTx(c.getQuery, tx).QueryContext(ctx, key)
if err != nil {
return nil, err
}

if !rows.Next() {
return nil, nil
}

var result []byte
if err := rows.Scan(&result); err != nil {
return result, err
}

return result, rows.Close()
}

func (c *dbStorageClient) set(ctx context.Context, key string, value []byte, tx *sql.Tx) error {
_, err := c.wrapTx(c.setQuery, tx).ExecContext(ctx, key, value, value)
return err
}

func (c *dbStorageClient) delete(ctx context.Context, key string, tx *sql.Tx) error {
_, err := c.wrapTx(c.deleteQuery, tx).ExecContext(ctx, key)
return err
}

func (c *dbStorageClient) wrapTx(stmt *sql.Stmt, tx *sql.Tx) *sql.Stmt {
if tx != nil {
return tx.Stmt(stmt)
}

return stmt
}
Loading

0 comments on commit c6641c6

Please sign in to comment.