Skip to content

Commit

Permalink
[Advanced Visibility with SQL] Adding PostgreSQL 12 schema and interf…
Browse files Browse the repository at this point in the history
…ace (#3844)

* Create base schema for PostgreSQL 12

* Changes to PostgreSQL 12 visibility schame to support advanced visibility

* Create PostgreSQL 12 db interface and configs
  • Loading branch information
rodrigozhou authored Jan 31, 2023
1 parent f3cf504 commit cb22aab
Show file tree
Hide file tree
Showing 49 changed files with 1,448 additions and 16 deletions.
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,17 @@ install-schema-postgresql: temporal-sql-tool
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(VISIBILITY_DB) setup-schema -v 0.0
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(VISIBILITY_DB) update-schema -d ./schema/postgresql/v96/visibility/versioned

install-schema-postgresql12: temporal-sql-tool
@printf $(COLOR) "Install Postgres schema..."
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(TEMPORAL_DB) drop -f
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(TEMPORAL_DB) create
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(TEMPORAL_DB) setup -v 0.0
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(TEMPORAL_DB) update-schema -d ./schema/postgresql/v12/temporal/versioned
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(VISIBILITY_DB) drop -f
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(VISIBILITY_DB) create
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(VISIBILITY_DB) setup-schema -v 0.0
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(VISIBILITY_DB) update-schema -d ./schema/postgresql/v12/visibility/versioned

install-schema-es:
@printf $(COLOR) "Install Elasticsearch schema..."
curl --fail -X PUT "http://127.0.0.1:9200/_cluster/settings" -H "Content-Type: application/json" --data-binary @./schema/elasticsearch/visibility/cluster_settings_v7.json --write-out "\n"
Expand Down Expand Up @@ -450,6 +461,9 @@ start-mysql-es: temporal-server
start-postgres: temporal-server
./temporal-server --env development-postgres --allow-no-auth start

start-postgres12: temporal-server
./temporal-server --env development-postgres12 --allow-no-auth start

start-sqlite: temporal-server
./temporal-server --env development-sqlite --allow-no-auth start

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {
switch options.SQLDBPluginName {
case mysql.PluginName, mysql.PluginNameV8:
options.DBPort = environment.GetMySQLPort()
case postgresql.PluginName:
case postgresql.PluginName, postgresql.PluginNameV12:
options.DBPort = environment.GetPostgreSQLPort()
case sqlite.PluginName:
options.DBPort = 0
Expand Down
20 changes: 17 additions & 3 deletions common/persistence/persistence-tests/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ const (
testMySQLSchemaDir = "schema/mysql/v57"
testMySQL8SchemaDir = "schema/mysql/v8"

testPostgreSQLUser = "temporal"
testPostgreSQLPassword = "temporal"
testPostgreSQLSchemaDir = "schema/postgresql/v96"
testPostgreSQLUser = "temporal"
testPostgreSQLPassword = "temporal"
testPostgreSQLSchemaDir = "schema/postgresql/v96"
testPostgreSQL12SchemaDir = "schema/postgresql/v12"

testSQLiteUser = ""
testSQLitePassword = ""
Expand Down Expand Up @@ -88,6 +89,19 @@ func GetPostgreSQLTestClusterOption() *TestBaseOptions {
}
}

// GetPostgreSQL12TestClusterOption return test options
func GetPostgreSQL12TestClusterOption() *TestBaseOptions {
return &TestBaseOptions{
SQLDBPluginName: postgresql.PluginName,
DBUsername: testPostgreSQLUser,
DBPassword: testPostgreSQLPassword,
DBHost: environment.GetPostgreSQLAddress(),
DBPort: environment.GetPostgreSQLPort(),
SchemaDir: testPostgreSQL12SchemaDir,
StoreType: config.StoreTypeSQL,
}
}

// GetSQLiteTestClusterOption return test options
func GetSQLiteFileTestClusterOption() *TestBaseOptions {
return &TestBaseOptions{
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/postgresql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

"go.temporal.io/server/common/persistence/schema"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
postgresqlschema "go.temporal.io/server/schema/postgresql"
postgresqlschemaV96 "go.temporal.io/server/schema/postgresql/v96"
)

// ErrDupEntryCode indicates a duplicate primary key i.e. the row already exists,
Expand Down Expand Up @@ -119,9 +119,9 @@ func (pdb *db) DbName() string {
func (pdb *db) ExpectedVersion() string {
switch pdb.dbKind {
case sqlplugin.DbKindMain:
return postgresqlschema.Version
return postgresqlschemaV96.Version
case sqlplugin.DbKindVisibility:
return postgresqlschema.VisibilityVersion
return postgresqlschemaV96.VisibilityVersion
default:
panic(fmt.Sprintf("unknown db kind %v", pdb.dbKind))
}
Expand Down
100 changes: 100 additions & 0 deletions common/persistence/sql/sqlplugin/postgresql/db_v12.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package postgresql

import (
"context"
"fmt"

"github.com/jmoiron/sqlx"

"go.temporal.io/server/common/persistence/schema"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
postgresqlschemaV12 "go.temporal.io/server/schema/postgresql/v12"
)

// dbV12 represents a logical connection to mysql database
type dbV12 struct {
db
}

var _ sqlplugin.DB = (*dbV12)(nil)
var _ sqlplugin.Tx = (*dbV12)(nil)

// newDB returns an instance of DB, which is a logical
// connection to the underlying postgresql database
func newDBV12(
dbKind sqlplugin.DbKind,
dbName string,
xdb *sqlx.DB,
tx *sqlx.Tx,
) *dbV12 {
mdb := &dbV12{
db: db{
dbKind: dbKind,
dbName: dbName,
db: xdb,
tx: tx,
},
}
mdb.conn = xdb
if tx != nil {
mdb.conn = tx
}
mdb.converter = &converter{}
return mdb
}

// BeginTx starts a new transaction and returns a reference to the Tx object
func (pdb *dbV12) BeginTx(ctx context.Context) (sqlplugin.Tx, error) {
xtx, err := pdb.db.db.BeginTxx(ctx, nil)
if err != nil {
return nil, err
}
return newDB(pdb.dbKind, pdb.dbName, pdb.db.db, xtx), nil
}

// PluginName returns the name of the mysql plugin
func (pdb *dbV12) PluginName() string {
return PluginNameV12
}

// ExpectedVersion returns expected version.
func (pdb *dbV12) ExpectedVersion() string {
switch pdb.dbKind {
case sqlplugin.DbKindMain:
return postgresqlschemaV12.Version
case sqlplugin.DbKindVisibility:
return postgresqlschemaV12.VisibilityVersion
default:
panic(fmt.Sprintf("unknown db kind %v", pdb.dbKind))
}
}

// VerifyVersion verify schema version is up to date
func (pdb *dbV12) VerifyVersion() error {
expectedVersion := pdb.ExpectedVersion()
return schema.VerifyCompatibleVersion(pdb, pdb.dbName, expectedVersion)
}
75 changes: 75 additions & 0 deletions common/persistence/sql/sqlplugin/postgresql/plugin_v12.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package postgresql

import (
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
"go.temporal.io/server/common/resolver"
)

const (
// PluginName is the name of the plugin
PluginNameV12 = "postgres12"
)

type pluginV12 struct {
plugin
}

var _ sqlplugin.Plugin = (*pluginV12)(nil)

func init() {
sql.RegisterPlugin(PluginNameV12, &pluginV12{})
}

// CreateDB initialize the db object
func (d *pluginV12) CreateDB(
dbKind sqlplugin.DbKind,
cfg *config.SQL,
r resolver.ServiceResolver,
) (sqlplugin.DB, error) {
conn, err := d.createDBConnection(cfg, r)
if err != nil {
return nil, err
}
db := newDBV12(dbKind, cfg.DatabaseName, conn, nil)
return db, nil
}

// CreateAdminDB initialize the adminDB object
func (d *pluginV12) CreateAdminDB(
dbKind sqlplugin.DbKind,
cfg *config.SQL,
r resolver.ServiceResolver,
) (sqlplugin.AdminDB, error) {
conn, err := d.createDBConnection(cfg, r)
if err != nil {
return nil, err
}
db := newDBV12(dbKind, cfg.DatabaseName, conn, nil)
return db, nil
}
28 changes: 28 additions & 0 deletions common/persistence/tests/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ func TestPostgreSQLVisibilityPersistenceSuite(t *testing.T) {
suite.Run(t, s)
}

func TestPostgreSQL12VisibilityPersistenceSuite(t *testing.T) {
s := &VisibilityPersistenceSuite{
TestBase: persistencetests.NewTestBaseWithSQL(persistencetests.GetPostgreSQL12TestClusterOption()),
}
suite.Run(t, s)
}

// TODO: Merge persistence-tests into the tests directory.

func TestPostgreSQLHistoryV2PersistenceSuite(t *testing.T) {
Expand Down Expand Up @@ -189,6 +196,27 @@ FAIL: TestPostgreSQLQueuePersistence/TestNamespaceReplicationQueue (0.26s)
// suite.Run(t, s)
//}

func TestPostgreSQL12HistoryV2PersistenceSuite(t *testing.T) {
s := new(persistencetests.HistoryV2PersistenceSuite)
s.TestBase = persistencetests.NewTestBaseWithSQL(persistencetests.GetPostgreSQL12TestClusterOption())
s.TestBase.Setup(nil)
suite.Run(t, s)
}

func TestPostgreSQL12MetadataPersistenceSuiteV2(t *testing.T) {
s := new(persistencetests.MetadataPersistenceSuiteV2)
s.TestBase = persistencetests.NewTestBaseWithSQL(persistencetests.GetPostgreSQL12TestClusterOption())
s.TestBase.Setup(nil)
suite.Run(t, s)
}

func TestPostgreSQL12ClusterMetadataPersistence(t *testing.T) {
s := new(persistencetests.ClusterMetadataManagerSuite)
s.TestBase = persistencetests.NewTestBaseWithSQL(persistencetests.GetPostgreSQL12TestClusterOption())
s.TestBase.Setup(nil)
suite.Run(t, s)
}

// SQL store tests

func TestPostgreSQLNamespaceSuite(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/tests/postgresql_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
_ "go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
"go.temporal.io/server/common/resolver"
"go.temporal.io/server/common/shuffle"
"go.temporal.io/server/environment"
Expand All @@ -56,8 +56,8 @@ const (

// TODO hard code this dir for now
// need to merge persistence test config / initialization in one place
testPostgreSQLExecutionSchema = "../../../schema/postgresql/v96/temporal/schema.sql"
testPostgreSQLVisibilitySchema = "../../../schema/postgresql/v96/visibility/schema.sql"
testPostgreSQLExecutionSchema = "../../../schema/postgresql/v12/temporal/schema.sql"
testPostgreSQLVisibilitySchema = "../../../schema/postgresql/v12/visibility/schema.sql"
)

type (
Expand Down Expand Up @@ -100,7 +100,7 @@ func NewPostgreSQLConfig() *config.SQL {
strconv.Itoa(environment.GetPostgreSQLPort()),
),
ConnectProtocol: testPostgreSQLConnectionProtocol,
PluginName: "postgres",
PluginName: postgresql.PluginNameV12,
DatabaseName: testPostgreSQLDatabaseNamePrefix + shuffle.String(testPostgreSQLDatabaseNameSuffix),
}
}
Expand Down
Loading

0 comments on commit cb22aab

Please sign in to comment.