From c6641c6126d6210b352326efacac15215b001ead Mon Sep 17 00:00:00 2001 From: Fiery-Fenix Date: Mon, 10 Feb 2025 14:01:20 +0200 Subject: [PATCH] [extension/dbstorage] Add DB Transactions to dbstorage.Batch() method --- .../feat_dbstorage-transaction-support.yaml | 27 ++ extension/storage/dbstorage/client.go | 77 +++-- extension/storage/dbstorage/client_test.go | 285 ++++++++++++++++++ extension/storage/dbstorage/config.go | 10 + extension/storage/dbstorage/config_test.go | 8 +- extension/storage/dbstorage/extension_test.go | 4 +- extension/storage/dbstorage/go.mod | 1 + extension/storage/dbstorage/go.sum | 3 + 8 files changed, 388 insertions(+), 27 deletions(-) create mode 100644 .chloggen/feat_dbstorage-transaction-support.yaml create mode 100644 extension/storage/dbstorage/client_test.go diff --git a/.chloggen/feat_dbstorage-transaction-support.yaml b/.chloggen/feat_dbstorage-transaction-support.yaml new file mode 100644 index 000000000000..9ad3ff89bbf6 --- /dev/null +++ b/.chloggen/feat_dbstorage-transaction-support.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: dbstorageextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add DB Transactions to dbstorage.Batch() method as it is expected by Storage API + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37805] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/extension/storage/dbstorage/client.go b/extension/storage/dbstorage/client.go index 7f40e213fb94..977b50746dce 100644 --- a/extension/storage/dbstorage/client.go +++ b/extension/storage/dbstorage/client.go @@ -33,7 +33,7 @@ type dbStorageClient struct { func newClient(ctx context.Context, driverName string, db *sql.DB, tableName string) (*dbStorageClient, error) { createTableSQL := createTable - if driverName == "sqlite" { + if driverName == driverSqlite { createTableSQL = createTableSqlite } var err error @@ -59,45 +59,37 @@ func newClient(ctx context.Context, driverName string, db *sql.DB, tableName str // Get will retrieve data from storage that corresponds to the specified key func (c *dbStorageClient) Get(ctx context.Context, key string) ([]byte, error) { - rows, err := c.getQuery.QueryContext(ctx, key) - if err != nil { - return nil, err - } - if !rows.Next() { - return nil, nil - } - var result []byte - err = rows.Scan(&result) - if err != nil { - return result, err - } - err = rows.Close() - return result, err + return c.get(ctx, key, nil) } // Set will store data. The data can be retrieved using the same key func (c *dbStorageClient) Set(ctx context.Context, key string, value []byte) error { - _, err := c.setQuery.ExecContext(ctx, key, value, value) - return err + return c.set(ctx, key, value, nil) } // Delete will delete data associated with the specified key func (c *dbStorageClient) Delete(ctx context.Context, key string) error { - _, err := c.deleteQuery.ExecContext(ctx, key) - return err + return c.delete(ctx, key, nil) } // Batch executes the specified operations in order. Get operation results are updated in place func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation) error { - var err error + // Start a new transaction + tx, err := c.db.BeginTx(ctx, nil) + if err != nil { + return err + } + //nolint:errcheck + defer tx.Rollback() + for _, op := range ops { switch op.Type { case storage.Get: - op.Value, err = c.Get(ctx, op.Key) + op.Value, err = c.get(ctx, op.Key, tx) case storage.Set: - err = c.Set(ctx, op.Key, op.Value) + err = c.set(ctx, op.Key, op.Value, tx) case storage.Delete: - err = c.Delete(ctx, op.Key) + err = c.delete(ctx, op.Key, tx) default: return errors.New("wrong operation type") } @@ -106,7 +98,8 @@ func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation) return err } } - return err + + return tx.Commit() } // Close will close the database @@ -119,3 +112,39 @@ func (c *dbStorageClient) Close(_ context.Context) error { } return c.getQuery.Close() } + +func (c *dbStorageClient) get(ctx context.Context, key string, tx *sql.Tx) ([]byte, error) { + rows, err := c.wrapTx(c.getQuery, tx).QueryContext(ctx, key) + if err != nil { + return nil, err + } + + if !rows.Next() { + return nil, nil + } + + var result []byte + if err := rows.Scan(&result); err != nil { + return result, err + } + + return result, rows.Close() +} + +func (c *dbStorageClient) set(ctx context.Context, key string, value []byte, tx *sql.Tx) error { + _, err := c.wrapTx(c.setQuery, tx).ExecContext(ctx, key, value, value) + return err +} + +func (c *dbStorageClient) delete(ctx context.Context, key string, tx *sql.Tx) error { + _, err := c.wrapTx(c.deleteQuery, tx).ExecContext(ctx, key) + return err +} + +func (c *dbStorageClient) wrapTx(stmt *sql.Stmt, tx *sql.Tx) *sql.Stmt { + if tx != nil { + return tx.Stmt(stmt) + } + + return stmt +} diff --git a/extension/storage/dbstorage/client_test.go b/extension/storage/dbstorage/client_test.go new file mode 100644 index 000000000000..2056cacf14c6 --- /dev/null +++ b/extension/storage/dbstorage/client_test.go @@ -0,0 +1,285 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package dbstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage" + +import ( + "context" + "database/sql" + "fmt" + "regexp" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/extension/xextension/storage" +) + +var testTableName = "exporter_otlp_test" + +func Test_newClient(t *testing.T) { + t.Run("Should return client with generic query(s)", func(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(createTable, testTableName))). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(getQueryText, testTableName))) + mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName))) + mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName))) + + _, err = newClient(context.Background(), driverPostgresql, db, testTableName) + assert.NoError(t, err) + }) + t.Run("Should return client with Sqlite specific query(s)", func(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(createTableSqlite, testTableName))). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(getQueryText, testTableName))) + mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName))) + mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName))) + + _, err = newClient(context.Background(), driverSqlite, db, testTableName) + assert.NoError(t, err) + }) +} + +func Test_dbStorageClient_Get(t *testing.T) { + t.Run("Should get a row without transaction", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectQuery(regexp.QuoteMeta(fmt.Sprintf(getQueryText, testTableName))). + WithArgs("test"). + WillReturnRows(sqlmock.NewRows([]string{"value"}).AddRow([]byte("value"))) + + got, err := client.Get(context.Background(), "test") + assert.NoError(t, err) + assert.Equal(t, []byte("value"), got) + }) + t.Run("Should get a row within transaction", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectBegin() + mock.ExpectQuery(regexp.QuoteMeta(fmt.Sprintf(getQueryText, testTableName))). + WithArgs("test"). + WillReturnRows(sqlmock.NewRows([]string{"value"}).AddRow([]byte("value"))) + mock.ExpectCommit() + + tx, err := client.db.BeginTx(context.Background(), nil) + require.NoError(t, err) + + got, err := client.get(context.Background(), "test", tx) + assert.NoError(t, err) + assert.Equal(t, []byte("value"), got) + assert.NoError(t, tx.Commit()) + }) + t.Run("Should return only first row from set", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectQuery(regexp.QuoteMeta(fmt.Sprintf(getQueryText, testTableName))). + WithArgs("test"). + WillReturnRows(sqlmock.NewRows([]string{"value"}).AddRow([]byte("first")).AddRow([]byte("second"))) + + got, err := client.Get(context.Background(), "test") + assert.NoError(t, err) + assert.Equal(t, []byte("first"), got) + }) + t.Run("Shouldn't return error if no records selected", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectQuery(regexp.QuoteMeta(fmt.Sprintf(getQueryText, testTableName))). + WithArgs("test"). + WillReturnRows(sqlmock.NewRows([]string{"value"})) + + got, err := client.Get(context.Background(), "test") + assert.NoError(t, err) + assert.Nil(t, got) + }) +} + +func Test_dbStorageClient_Set(t *testing.T) { + t.Run("Should delete a row without transaction", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName))). + WithArgs("test", []byte("value"), []byte("value")). + WillReturnResult(sqlmock.NewResult(1, 1)) + + assert.NoError(t, client.Set(context.Background(), "test", []byte("value"))) + }) + t.Run("Should delete a row within transaction", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName))). + WithArgs("test", []byte("value"), []byte("value")). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + tx, err := client.db.BeginTx(context.Background(), nil) + require.NoError(t, err) + + assert.NoError(t, client.set(context.Background(), "test", []byte("value"), tx)) + assert.NoError(t, tx.Commit()) + }) + t.Run("Shouldn't return error if no record exists", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName))). + WithArgs("test", []byte("value"), []byte("value")). + WillReturnResult(sqlmock.NewResult(0, 0)) + + assert.NoError(t, client.Set(context.Background(), "test", []byte("value"))) + }) +} + +func Test_dbStorageClient_Delete(t *testing.T) { + t.Run("Should delete a row without transaction", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName))). + WithArgs("test"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + assert.NoError(t, client.Delete(context.Background(), "test")) + }) + t.Run("Should delete a row within transaction", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName))). + WithArgs("test"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + tx, err := client.db.BeginTx(context.Background(), nil) + require.NoError(t, err) + + assert.NoError(t, client.delete(context.Background(), "test", tx)) + assert.NoError(t, tx.Commit()) + }) + t.Run("Shouldn't return error if no record exists", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName))). + WithArgs("test"). + WillReturnResult(sqlmock.NewResult(0, 0)) + + assert.NoError(t, client.Delete(context.Background(), "test")) + }) +} + +func Test_dbStorageClient_Batch(t *testing.T) { + t.Run("Should run set of operations in a single transaction per call", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName))). + WithArgs("test", []byte("value"), []byte("value")). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName))). + WithArgs("test"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName))). + WithArgs("test"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + ops := []*storage.Operation{ + storage.SetOperation("test", []byte("value")), + storage.DeleteOperation("test"), + } + assert.NoError(t, client.Batch(context.Background(), ops...)) + assert.NoError(t, client.Batch(context.Background(), ops[1])) + }) + t.Run("Should return error if any operation failed, with transaction rollback", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName))). + WithArgs("test", []byte("value"), []byte("value")). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName))). + WithArgs("test"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery(regexp.QuoteMeta(fmt.Sprintf(getQueryText, testTableName))). + WithArgs("test"). + WillReturnError(sql.ErrConnDone) + mock.ExpectRollback() + + ops := []*storage.Operation{ + storage.SetOperation("test", []byte("value")), + storage.DeleteOperation("test"), + storage.GetOperation("test"), + } + assert.ErrorIs(t, client.Batch(context.Background(), ops...), sql.ErrConnDone) + }) +} + +func Test_dbStorageClient_wrapTx(t *testing.T) { + t.Run("should wrap prepared statement in transaction", func(t *testing.T) { + client, mock := newTestClient(t) + defer client.db.Close() + + mock.ExpectBegin() + mock.ExpectRollback() + + tx, err := client.db.BeginTx(context.Background(), nil) + require.NoError(t, err) + //nolint:errcheck + defer tx.Rollback() + + stmt := client.getQuery + assert.NotEqual(t, stmt, client.wrapTx(stmt, tx)) + }) + t.Run("shouldn't wrap prepared statement without transaction", func(t *testing.T) { + client, _ := newTestClient(t) + defer client.db.Close() + + stmt := client.getQuery + assert.Equal(t, stmt, client.wrapTx(stmt, nil)) + }) +} + +func newTestClient(t *testing.T) (*dbStorageClient, sqlmock.Sqlmock) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + + mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(getQueryText, testTableName))) + mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName))) + mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName))) + + selectQuery, err := db.PrepareContext(context.Background(), fmt.Sprintf(getQueryText, testTableName)) + require.NoError(t, err) + setQuery, err := db.PrepareContext(context.Background(), fmt.Sprintf(setQueryText, testTableName)) + require.NoError(t, err) + deleteQuery, err := db.PrepareContext(context.Background(), fmt.Sprintf(deleteQueryText, testTableName)) + require.NoError(t, err) + + return &dbStorageClient{ + db: db, + getQuery: selectQuery, + setQuery: setQuery, + deleteQuery: deleteQuery, + }, mock +} diff --git a/extension/storage/dbstorage/config.go b/extension/storage/dbstorage/config.go index 01a03b1bea22..54fa3c586677 100644 --- a/extension/storage/dbstorage/config.go +++ b/extension/storage/dbstorage/config.go @@ -5,6 +5,12 @@ package dbstorage // import "github.com/open-telemetry/opentelemetry-collector-c import ( "errors" + "fmt" +) + +const ( + driverPostgresql = "pgx" + driverSqlite = "sqlite3" ) // Config defines configuration for dbstorage extension. @@ -21,5 +27,9 @@ func (cfg *Config) Validate() error { return errors.New("missing driver name") } + if cfg.DriverName != driverPostgresql && cfg.DriverName != driverSqlite { + return fmt.Errorf("unsupported driver %s", cfg.DriverName) + } + return nil } diff --git a/extension/storage/dbstorage/config_test.go b/extension/storage/dbstorage/config_test.go index a04c1b8f7d84..66476274a020 100644 --- a/extension/storage/dbstorage/config_test.go +++ b/extension/storage/dbstorage/config_test.go @@ -5,6 +5,7 @@ package dbstorage // import "github.com/open-telemetry/opentelemetry-collector-c import ( "errors" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -27,8 +28,13 @@ func TestConfigValidate(t *testing.T) { errors.New("missing datasource"), }, { - "valid", + "Unknown driver", Config{DriverName: "foo", DataSource: "bar"}, + fmt.Errorf("unsupported driver %s", "foo"), + }, + { + "Valid", + Config{DriverName: driverSqlite, DataSource: "bar"}, nil, }, } diff --git a/extension/storage/dbstorage/extension_test.go b/extension/storage/dbstorage/extension_test.go index 91925971bd1c..063b474446ce 100644 --- a/extension/storage/dbstorage/extension_test.go +++ b/extension/storage/dbstorage/extension_test.go @@ -120,7 +120,7 @@ func testExtensionIntegrity(t *testing.T, se storage.Extension) { func newSqliteTestExtension(t *testing.T) storage.Extension { f := NewFactory() cfg := f.CreateDefaultConfig().(*Config) - cfg.DriverName = "sqlite3" + cfg.DriverName = driverSqlite cfg.DataSource = fmt.Sprintf("file:%s/foo.db?_busy_timeout=10000&_journal=WAL&_sync=NORMAL", t.TempDir()) extension, err := f.Create(context.Background(), extensiontest.NewNopSettings(), cfg) @@ -162,7 +162,7 @@ func newPostgresTestExtension(t *testing.T) storage.Extension { }) f := NewFactory() cfg := f.CreateDefaultConfig().(*Config) - cfg.DriverName = "pgx" + cfg.DriverName = driverPostgresql cfg.DataSource = fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", "127.0.0.1", port.Port(), "root", "passwd", "db") extension, err := f.Create(context.Background(), extensiontest.NewNopSettings(), cfg) diff --git a/extension/storage/dbstorage/go.mod b/extension/storage/dbstorage/go.mod index 67d8b1cc4ef0..76c65bd3d06e 100644 --- a/extension/storage/dbstorage/go.mod +++ b/extension/storage/dbstorage/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/stora go 1.22.0 require ( + github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/docker/docker v27.5.1+incompatible github.com/docker/go-connections v0.5.0 github.com/jackc/pgx/v5 v5.7.2 diff --git a/extension/storage/dbstorage/go.sum b/extension/storage/dbstorage/go.sum index 0992ffbdbecf..cdc7fbb2b44e 100644 --- a/extension/storage/dbstorage/go.sum +++ b/extension/storage/dbstorage/go.sum @@ -4,6 +4,8 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9 github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -60,6 +62,7 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=