Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

align migrations table name with grabbit convention #140

Merged
merged 9 commits into from
Aug 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions gbus/tx/mysql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package mysql

import (
"database/sql"
"regexp"
"strings"

"github.com/lopezator/migrator"
"github.com/wework/grabbit/gbus/tx"
)

//SagaStoreTableMigration creates the service saga store table
func SagaStoreTableMigration(svcName string) *migrator.Migration {
tblName := tx.GetSagatableName(svcName)
func sagaStoreTableMigration(svcName string) *migrator.Migration {
tblName := tx.GrabbitTableNameTemplate(svcName, "sagas")

createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
rec_id INT PRIMARY KEY AUTO_INCREMENT,
Expand All @@ -33,10 +31,10 @@ func SagaStoreTableMigration(svcName string) *migrator.Migration {
}
}

//OutboxMigrations creates service outbox table
func OutboxMigrations(svcName string) *migrator.Migration {
func outboxMigrations(svcName string) *migrator.Migration {

query := `CREATE TABLE IF NOT EXISTS ` + getOutboxName(svcName) + ` (
tblName := tx.GrabbitTableNameTemplate(svcName, "outbox")
query := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
rec_id int NOT NULL AUTO_INCREMENT,
message_id varchar(50) NOT NULL UNIQUE,
message_type varchar(50) NOT NULL,
Expand All @@ -62,8 +60,7 @@ func OutboxMigrations(svcName string) *migrator.Migration {
}
}

//TimoutTableMigration creates the service timeout table, where timeouts are persisted
func TimoutTableMigration(svcName string) *migrator.Migration {
func timoutTableMigration(svcName string) *migrator.Migration {
tblName := GetTimeoutsTableName(svcName)

createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
Expand All @@ -85,14 +82,31 @@ func TimoutTableMigration(svcName string) *migrator.Migration {
}
}

func legacyMigrationsTable(svcName string) *migrator.Migration {

query := `DROP TABLE IF EXISTS grabbitmigrations_` + sanitizeSvcName(svcName)

return &migrator.Migration{
Name: "drop legacy migrations table",
Func: func(tx *sql.Tx) error {
if _, err := tx.Exec(query); err != nil {
return err
}
return nil
},
}
}

//EnsureSchema implements Grabbit's migrations strategy
func EnsureSchema(db *sql.DB, svcName string) {
migrationsTable := sanitizedMigrationsTable(svcName)

migrate, err := migrator.New(migrator.TableName(migrationsTable), migrator.Migrations(
OutboxMigrations(svcName),
SagaStoreTableMigration(svcName),
TimoutTableMigration(svcName),
tblName := tx.GrabbitTableNameTemplate(svcName, "migrations")

migrate, err := migrator.New(migrator.TableName(tblName), migrator.Migrations(
outboxMigrations(svcName),
sagaStoreTableMigration(svcName),
timoutTableMigration(svcName),
legacyMigrationsTable(svcName),
))
if err != nil {
panic(err)
Expand All @@ -103,9 +117,8 @@ func EnsureSchema(db *sql.DB, svcName string) {
}
}

func sanitizedMigrationsTable(svcName string) string {
var re = regexp.MustCompile(`-|;|\\|`)
sanitized := re.ReplaceAllString(svcName, "")
func sanitizeSvcName(svcName string) string {

return strings.ToLower("grabbitMigrations_" + sanitized)
sanitized := tx.SanitizeTableName(svcName)
return strings.ToLower(sanitized)
}
10 changes: 0 additions & 10 deletions gbus/tx/mysql/sanitize.go

This file was deleted.

9 changes: 2 additions & 7 deletions gbus/tx/mysql/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package mysql

import (
"database/sql"
"regexp"
"strings"
"time"

"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/tx"
)

var _ gbus.TimeoutManager = &TimeoutManager{}
Expand Down Expand Up @@ -173,11 +172,7 @@ func (tm *TimeoutManager) SetTimeoutFunction(timeoutFunc func(tx *sql.Tx, sagaID

//GetTimeoutsTableName returns the table name in which to store timeouts
func GetTimeoutsTableName(svcName string) string {

var re = regexp.MustCompile(`-|;|\\|`)
sanitized := re.ReplaceAllString(svcName, "")

return strings.ToLower("grabbit_" + sanitized + "_timeouts")
return tx.GrabbitTableNameTemplate(svcName, "timeouts")
}

//NewTimeoutManager creates a new instance of a mysql based TimeoutManager
Expand Down
11 changes: 6 additions & 5 deletions gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"database/sql"
"encoding/gob"
"fmt"
"strconv"
"sync"
"time"

"github.com/rs/xid"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/wework/grabbit/gbus"
"strconv"
"strings"
"sync"
"time"
"github.com/wework/grabbit/gbus/tx"
)

var (
Expand Down Expand Up @@ -315,5 +316,5 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows,

func getOutboxName(svcName string) string {

return strings.ToLower("grabbit_" + sanitizeTableName(svcName) + "_outbox")
return tx.GrabbitTableNameTemplate(svcName, "outbox")
}
23 changes: 23 additions & 0 deletions gbus/tx/sanitize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package tx

import (
"fmt"
"regexp"
"strings"
)

//SanitizeTableName returns a sanitizes and lower cased string for creating a table
func SanitizeTableName(dirty string) string {

var re = regexp.MustCompile(`-|;|\\|`)
sanitized := re.ReplaceAllString(dirty, "")
return strings.ToLower(sanitized)
}

//GrabbitTableNameTemplate returns the tamplated grabbit table name for the table type and service
func GrabbitTableNameTemplate(svcName, table string) string {
sanitized := SanitizeTableName(svcName)
templated := fmt.Sprintf("grabbit_%s_%s", sanitized, table)
return strings.ToLower(templated)

}