Skip to content

Commit

Permalink
Merge branch 'cadence-workflow:master' into default-domain
Browse files Browse the repository at this point in the history
  • Loading branch information
samkitshah1262 authored Feb 4, 2025
2 parents a4f259c + 9c01fa8 commit 747f55f
Show file tree
Hide file tree
Showing 18 changed files with 396 additions and 212 deletions.
20 changes: 10 additions & 10 deletions common/persistence/sql/sqlplugin/mysql/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,52 +61,52 @@ const (
)

// CreateSchemaVersionTables sets up the schema version tables
func (mdb *db) CreateSchemaVersionTables() error {
func (mdb *DB) CreateSchemaVersionTables() error {
if err := mdb.ExecSchemaOperationQuery(context.Background(), createSchemaVersionTableQuery); err != nil {
return err
}
return mdb.ExecSchemaOperationQuery(context.Background(), createSchemaUpdateHistoryTableQuery)
}

// ReadSchemaVersion returns the current schema version for the keyspace
func (mdb *db) ReadSchemaVersion(database string) (string, error) {
func (mdb *DB) ReadSchemaVersion(database string) (string, error) {
var version string
err := mdb.driver.GetForSchemaQuery(sqlplugin.DbShardUndefined, &version, readSchemaVersionQuery, database)
return version, err
}

// UpdateSchemaVersion updates the schema version for the keyspace
func (mdb *db) UpdateSchemaVersion(database string, newVersion string, minCompatibleVersion string) error {
func (mdb *DB) UpdateSchemaVersion(database string, newVersion string, minCompatibleVersion string) error {
return mdb.ExecSchemaOperationQuery(context.Background(), writeSchemaVersionQuery, database, time.Now(), newVersion, minCompatibleVersion)
}

// WriteSchemaUpdateLog adds an entry to the schema update history table
func (mdb *db) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error {
func (mdb *DB) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error {
now := time.Now().UTC()
return mdb.ExecSchemaOperationQuery(context.Background(), writeSchemaUpdateHistoryQuery, now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, desc)
}

// ExecSchemaOperationQuery executes a sql statement for schema ONLY. DO NOT use it in other cases, otherwise it will not work for multiple SQL database.
// For Sharded SQL, it will execute the statement for all shards
func (mdb *db) ExecSchemaOperationQuery(ctx context.Context, stmt string, args ...interface{}) error {
func (mdb *DB) ExecSchemaOperationQuery(ctx context.Context, stmt string, args ...interface{}) error {
_, err := mdb.driver.ExecDDL(ctx, sqlplugin.DbShardUndefined, stmt, args...)
return err
}

// ListTables returns a list of tables in this database
func (mdb *db) ListTables(database string) ([]string, error) {
func (mdb *DB) ListTables(database string) ([]string, error) {
var tables []string
err := mdb.driver.SelectForSchemaQuery(sqlplugin.DbShardUndefined, &tables, fmt.Sprintf(listTablesQuery, database))
return tables, err
}

// DropTable drops a given table from the database
func (mdb *db) DropTable(name string) error {
func (mdb *DB) DropTable(name string) error {
return mdb.ExecSchemaOperationQuery(context.Background(), fmt.Sprintf(dropTableQuery, name))
}

// DropAllTables drops all tables from this database
func (mdb *db) DropAllTables(database string) error {
func (mdb *DB) DropAllTables(database string) error {
tables, err := mdb.ListTables(database)
if err != nil {
return err
Expand All @@ -120,11 +120,11 @@ func (mdb *db) DropAllTables(database string) error {
}

// CreateDatabase creates a database if it doesn't exist
func (mdb *db) CreateDatabase(name string) error {
func (mdb *DB) CreateDatabase(name string) error {
return mdb.ExecSchemaOperationQuery(context.Background(), fmt.Sprintf(createDatabaseQuery, name))
}

// DropDatabase drops a database
func (mdb *db) DropDatabase(name string) error {
func (mdb *DB) DropDatabase(name string) error {
return mdb.ExecSchemaOperationQuery(context.Background(), fmt.Sprintf(dropDatabaseQuery, name))
}
8 changes: 4 additions & 4 deletions common/persistence/sql/sqlplugin/mysql/configstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import (
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)

func (mdb *db) InsertConfig(ctx context.Context, row *persistence.InternalConfigStoreEntry) error {
_, err := mdb.driver.ExecContext(ctx, sqlplugin.DbDefaultShard, _insertConfigQuery, row.RowType, -1*row.Version, mdb.converter.ToMySQLDateTime(row.Timestamp), row.Values.Data, row.Values.Encoding)
func (mdb *DB) InsertConfig(ctx context.Context, row *persistence.InternalConfigStoreEntry) error {
_, err := mdb.driver.ExecContext(ctx, sqlplugin.DbDefaultShard, _insertConfigQuery, row.RowType, -1*row.Version, mdb.converter.ToDateTime(row.Timestamp), row.Values.Data, row.Values.Encoding)
return err
}

func (mdb *db) SelectLatestConfig(ctx context.Context, rowType int) (*persistence.InternalConfigStoreEntry, error) {
func (mdb *DB) SelectLatestConfig(ctx context.Context, rowType int) (*persistence.InternalConfigStoreEntry, error) {
var row sqlplugin.ClusterConfigRow
err := mdb.driver.GetContext(ctx, sqlplugin.DbDefaultShard, &row, _selectLatestConfigQuery, rowType)
if err != nil {
Expand All @@ -45,7 +45,7 @@ func (mdb *db) SelectLatestConfig(ctx context.Context, rowType int) (*persistenc
return &persistence.InternalConfigStoreEntry{
RowType: row.RowType,
Version: row.Version,
Timestamp: mdb.converter.FromMySQLDateTime(row.Timestamp),
Timestamp: mdb.converter.FromDateTime(row.Timestamp),
Values: &persistence.DataBlob{
Data: row.Data,
Encoding: common.EncodingType(row.DataEncoding),
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/sqlplugin/mysql/configstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestInsertConfig(t *testing.T) {
defer ctrl.Finish()

mockDriver := sqldriver.NewMockDriver(ctrl)
mdb := &db{driver: mockDriver, converter: &converter{}}
mdb := &DB{driver: mockDriver, converter: &converter{}}

// Setup mock expectations
tc.mockSetup(mockDriver)
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestSelectLatestConfig(t *testing.T) {
defer ctrl.Finish()

mockDriver := sqldriver.NewMockDriver(ctrl)
mdb := &db{driver: mockDriver, converter: &converter{}}
mdb := &DB{driver: mockDriver, converter: &converter{}}

tc.setupMock(mockDriver)

Expand Down
74 changes: 37 additions & 37 deletions common/persistence/sql/sqlplugin/mysql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,53 @@ import (
)

type (
db struct {
DB struct {
converter DataConverter
driver sqldriver.Driver
originalDBs []*sqlx.DB
numDBShards int
}
)

func (mdb *db) GetTotalNumDBShards() int {
// NewDB returns an instance of DB, which is a logical
// connection to the underlying mysql database
// dbShardID is needed when tx is not nil
func NewDB(xdbs []*sqlx.DB, tx *sqlx.Tx, dbShardID int, numDBShards int, converter DataConverter) (*DB, error) {
driver, err := sqldriver.NewDriver(xdbs, tx, dbShardID)
if err != nil {
return nil, err
}

db := &DB{
converter: converter,
originalDBs: xdbs, // this is kept because NewDB will be called again when starting a transaction
driver: driver,
numDBShards: numDBShards,
}

return db, nil
}

func (mdb *DB) GetTotalNumDBShards() int {
return mdb.numDBShards
}

var _ sqlplugin.AdminDB = (*db)(nil)
var _ sqlplugin.DB = (*db)(nil)
var _ sqlplugin.Tx = (*db)(nil)
var _ sqlplugin.AdminDB = (*DB)(nil)
var _ sqlplugin.DB = (*DB)(nil)
var _ sqlplugin.Tx = (*DB)(nil)

func (mdb *db) IsDupEntryError(err error) bool {
func (mdb *DB) IsDupEntryError(err error) bool {
sqlErr, ok := err.(*mysql.MySQLError)
// ErrDupEntry MySQL Error 1062 indicates a duplicate primary key i.e. the row already exists,
// so we don't do the insert and return a ConditionalUpdate error.
return ok && sqlErr.Number == mysqlerr.ER_DUP_ENTRY
}

func (mdb *db) IsNotFoundError(err error) bool {
func (mdb *DB) IsNotFoundError(err error) bool {
return err == sql.ErrNoRows
}

func (mdb *db) IsTimeoutError(err error) bool {
func (mdb *DB) IsTimeoutError(err error) bool {
if err == context.DeadlineExceeded {
return true
}
Expand All @@ -80,7 +99,7 @@ func (mdb *db) IsTimeoutError(err error) bool {
return false
}

func (mdb *db) IsThrottlingError(err error) bool {
func (mdb *DB) IsThrottlingError(err error) bool {
sqlErr, ok := err.(*mysql.MySQLError)
if ok {
if sqlErr.Number == mysqlerr.ER_CON_COUNT_ERROR ||
Expand All @@ -93,65 +112,46 @@ func (mdb *db) IsThrottlingError(err error) bool {
return false
}

// newDB returns an instance of DB, which is a logical
// connection to the underlying mysql database
// dbShardID is needed when tx is not nil
func newDB(xdbs []*sqlx.DB, tx *sqlx.Tx, dbShardID int, numDBShards int) (*db, error) {
driver, err := sqldriver.NewDriver(xdbs, tx, dbShardID)
if err != nil {
return nil, err
}

db := &db{
converter: &converter{},
originalDBs: xdbs, // this is kept because newDB will be called again when starting a transaction
driver: driver,
numDBShards: numDBShards,
}

return db, nil
}

// BeginTx starts a new transaction and returns a reference to the Tx object
func (mdb *db) BeginTx(ctx context.Context, dbShardID int) (sqlplugin.Tx, error) {
func (mdb *DB) BeginTx(ctx context.Context, dbShardID int) (sqlplugin.Tx, error) {
xtx, err := mdb.driver.BeginTxx(ctx, dbShardID, nil)
if err != nil {
return nil, err
}
return newDB(mdb.originalDBs, xtx, dbShardID, mdb.numDBShards)
return NewDB(mdb.originalDBs, xtx, dbShardID, mdb.numDBShards, mdb.converter)
}

// Commit commits a previously started transaction
func (mdb *db) Commit() error {
func (mdb *DB) Commit() error {
return mdb.driver.Commit()
}

// Rollback triggers rollback of a previously started transaction
func (mdb *db) Rollback() error {
func (mdb *DB) Rollback() error {
return mdb.driver.Rollback()
}

// Close closes the connection to the mysql db
func (mdb *db) Close() error {
func (mdb *DB) Close() error {
return mdb.driver.Close()
}

// PluginName returns the name of the mysql plugin
func (mdb *db) PluginName() string {
func (mdb *DB) PluginName() string {
return PluginName
}

// SupportsTTL returns weather MySQL supports TTL
func (mdb *db) SupportsTTL() bool {
func (mdb *DB) SupportsTTL() bool {
return false
}

// MaxAllowedTTL returns the max allowed ttl MySQL supports
func (mdb *db) MaxAllowedTTL() (*time.Duration, error) {
func (mdb *DB) MaxAllowedTTL() (*time.Duration, error) {
return nil, sqlplugin.ErrTTLNotSupported
}

// SupportsTTL returns weather MySQL supports Asynchronous transaction
func (mdb *db) SupportsAsyncTransaction() bool {
func (mdb *DB) SupportsAsyncTransaction() bool {
return false
}
18 changes: 9 additions & 9 deletions common/persistence/sql/sqlplugin/mysql/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ const (
var errMissingArgs = errors.New("missing one or more args for API")

// InsertIntoDomain inserts a single row into domains table
func (mdb *db) InsertIntoDomain(ctx context.Context, row *sqlplugin.DomainRow) (sql.Result, error) {
func (mdb *DB) InsertIntoDomain(ctx context.Context, row *sqlplugin.DomainRow) (sql.Result, error) {
return mdb.driver.ExecContext(ctx, sqlplugin.DbDefaultShard, createDomainQuery, row.ID, row.Name, row.IsGlobal, row.Data, row.DataEncoding)
}

// UpdateDomain updates a single row in domains table
func (mdb *db) UpdateDomain(ctx context.Context, row *sqlplugin.DomainRow) (sql.Result, error) {
func (mdb *DB) UpdateDomain(ctx context.Context, row *sqlplugin.DomainRow) (sql.Result, error) {
return mdb.driver.ExecContext(ctx, sqlplugin.DbDefaultShard, updateDomainQuery, row.Name, row.Data, row.DataEncoding, row.ID)
}

// SelectFromDomain reads one or more rows from domains table
func (mdb *db) SelectFromDomain(ctx context.Context, filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) {
func (mdb *DB) SelectFromDomain(ctx context.Context, filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) {
switch {
case filter.ID != nil || filter.Name != nil:
return mdb.selectFromDomain(ctx, filter)
Expand All @@ -81,7 +81,7 @@ func (mdb *db) SelectFromDomain(ctx context.Context, filter *sqlplugin.DomainFil
}
}

func (mdb *db) selectFromDomain(ctx context.Context, filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) {
func (mdb *DB) selectFromDomain(ctx context.Context, filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) {
var err error
var row sqlplugin.DomainRow
switch {
Expand All @@ -96,7 +96,7 @@ func (mdb *db) selectFromDomain(ctx context.Context, filter *sqlplugin.DomainFil
return []sqlplugin.DomainRow{row}, err
}

func (mdb *db) selectAllFromDomain(ctx context.Context, filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) {
func (mdb *DB) selectAllFromDomain(ctx context.Context, filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) {
var err error
var rows []sqlplugin.DomainRow
switch {
Expand All @@ -109,7 +109,7 @@ func (mdb *db) selectAllFromDomain(ctx context.Context, filter *sqlplugin.Domain
}

// DeleteFromDomain deletes a single row in domains table
func (mdb *db) DeleteFromDomain(ctx context.Context, filter *sqlplugin.DomainFilter) (sql.Result, error) {
func (mdb *DB) DeleteFromDomain(ctx context.Context, filter *sqlplugin.DomainFilter) (sql.Result, error) {
var err error
var result sql.Result
switch {
Expand All @@ -122,20 +122,20 @@ func (mdb *db) DeleteFromDomain(ctx context.Context, filter *sqlplugin.DomainFil
}

// LockDomainMetadata acquires a write lock on a single row in domain_metadata table
func (mdb *db) LockDomainMetadata(ctx context.Context) error {
func (mdb *DB) LockDomainMetadata(ctx context.Context) error {
var row sqlplugin.DomainMetadataRow
err := mdb.driver.GetContext(ctx, sqlplugin.DbDefaultShard, &row.NotificationVersion, lockDomainMetadataQuery)
return err
}

// SelectFromDomainMetadata reads a single row in domain_metadata table
func (mdb *db) SelectFromDomainMetadata(ctx context.Context) (*sqlplugin.DomainMetadataRow, error) {
func (mdb *DB) SelectFromDomainMetadata(ctx context.Context) (*sqlplugin.DomainMetadataRow, error) {
var row sqlplugin.DomainMetadataRow
err := mdb.driver.GetContext(ctx, sqlplugin.DbDefaultShard, &row.NotificationVersion, getDomainMetadataQuery)
return &row, err
}

// UpdateDomainMetadata updates a single row in domain_metadata table
func (mdb *db) UpdateDomainMetadata(ctx context.Context, row *sqlplugin.DomainMetadataRow) (sql.Result, error) {
func (mdb *DB) UpdateDomainMetadata(ctx context.Context, row *sqlplugin.DomainMetadataRow) (sql.Result, error) {
return mdb.driver.ExecContext(ctx, sqlplugin.DbDefaultShard, updateDomainMetadataQuery, row.NotificationVersion+1, row.NotificationVersion)
}
14 changes: 7 additions & 7 deletions common/persistence/sql/sqlplugin/mysql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ const (
// For history_node table:

// InsertIntoHistoryNode inserts a row into history_node table
func (mdb *db) InsertIntoHistoryNode(ctx context.Context, row *sqlplugin.HistoryNodeRow) (sql.Result, error) {
func (mdb *DB) InsertIntoHistoryNode(ctx context.Context, row *sqlplugin.HistoryNodeRow) (sql.Result, error) {
// NOTE: Query 5.6 doesn't support clustering order, to workaround, we let txn_id multiple by -1
*row.TxnID *= -1
dbShardID := sqlplugin.GetDBShardIDFromTreeID(row.TreeID, mdb.GetTotalNumDBShards())
return mdb.driver.NamedExecContext(ctx, dbShardID, addHistoryNodesQuery, row)
}

// SelectFromHistoryNode reads one or more rows from history_node table
func (mdb *db) SelectFromHistoryNode(ctx context.Context, filter *sqlplugin.HistoryNodeFilter) ([]sqlplugin.HistoryNodeRow, error) {
func (mdb *DB) SelectFromHistoryNode(ctx context.Context, filter *sqlplugin.HistoryNodeFilter) ([]sqlplugin.HistoryNodeRow, error) {
var rows []sqlplugin.HistoryNodeRow
dbShardID := sqlplugin.GetDBShardIDFromTreeID(filter.TreeID, mdb.GetTotalNumDBShards())
err := mdb.driver.SelectContext(ctx, dbShardID, &rows, getHistoryNodesQuery,
Expand All @@ -74,34 +74,34 @@ func (mdb *db) SelectFromHistoryNode(ctx context.Context, filter *sqlplugin.Hist
}

// DeleteFromHistoryNode deletes one or more rows from history_node table
func (mdb *db) DeleteFromHistoryNode(ctx context.Context, filter *sqlplugin.HistoryNodeFilter) (sql.Result, error) {
func (mdb *DB) DeleteFromHistoryNode(ctx context.Context, filter *sqlplugin.HistoryNodeFilter) (sql.Result, error) {
dbShardID := sqlplugin.GetDBShardIDFromTreeID(filter.TreeID, mdb.GetTotalNumDBShards())
return mdb.driver.ExecContext(ctx, dbShardID, deleteHistoryNodesQuery, filter.ShardID, filter.TreeID, filter.BranchID, *filter.MinNodeID, filter.PageSize)
}

// For history_tree table:

// InsertIntoHistoryTree inserts a row into history_tree table
func (mdb *db) InsertIntoHistoryTree(ctx context.Context, row *sqlplugin.HistoryTreeRow) (sql.Result, error) {
func (mdb *DB) InsertIntoHistoryTree(ctx context.Context, row *sqlplugin.HistoryTreeRow) (sql.Result, error) {
dbShardID := sqlplugin.GetDBShardIDFromTreeID(row.TreeID, mdb.GetTotalNumDBShards())
return mdb.driver.NamedExecContext(ctx, dbShardID, addHistoryTreeQuery, row)
}

// SelectFromHistoryTree reads one or more rows from history_tree table
func (mdb *db) SelectFromHistoryTree(ctx context.Context, filter *sqlplugin.HistoryTreeFilter) ([]sqlplugin.HistoryTreeRow, error) {
func (mdb *DB) SelectFromHistoryTree(ctx context.Context, filter *sqlplugin.HistoryTreeFilter) ([]sqlplugin.HistoryTreeRow, error) {
var rows []sqlplugin.HistoryTreeRow
dbShardID := sqlplugin.GetDBShardIDFromTreeID(filter.TreeID, mdb.GetTotalNumDBShards())
err := mdb.driver.SelectContext(ctx, dbShardID, &rows, getHistoryTreeQuery, filter.ShardID, filter.TreeID)
return rows, err
}

// DeleteFromHistoryTree deletes one or more rows from history_tree table
func (mdb *db) DeleteFromHistoryTree(ctx context.Context, filter *sqlplugin.HistoryTreeFilter) (sql.Result, error) {
func (mdb *DB) DeleteFromHistoryTree(ctx context.Context, filter *sqlplugin.HistoryTreeFilter) (sql.Result, error) {
dbShardID := sqlplugin.GetDBShardIDFromTreeID(filter.TreeID, mdb.GetTotalNumDBShards())
return mdb.driver.ExecContext(ctx, dbShardID, deleteHistoryTreeQuery, filter.ShardID, filter.TreeID, *filter.BranchID)
}

func (mdb *db) GetAllHistoryTreeBranches(ctx context.Context, filter *sqlplugin.HistoryTreeFilter) ([]sqlplugin.HistoryTreeRow, error) {
func (mdb *DB) GetAllHistoryTreeBranches(ctx context.Context, filter *sqlplugin.HistoryTreeFilter) ([]sqlplugin.HistoryTreeRow, error) {
var rows []sqlplugin.HistoryTreeRow
dbShardID := sqlplugin.GetDBShardIDFromTreeID(filter.TreeID, mdb.GetTotalNumDBShards())
err := mdb.driver.SelectContext(ctx, dbShardID, &rows, getAllHistoryTreeQuery, filter.ShardID, filter.TreeID, *filter.BranchID, filter.ShardID, filter.TreeID, filter.ShardID, filter.PageSize)
Expand Down
Loading

0 comments on commit 747f55f

Please sign in to comment.