Skip to content

Commit

Permalink
align migrations table name with grabbit convention (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron authored Aug 25, 2019
1 parent 58b7cec commit a916269
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 40 deletions.
49 changes: 31 additions & 18 deletions gbus/tx/mysql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package mysql

import (
"database/sql"
"regexp"
"strings"

"github.com/lopezator/migrator"
"github.com/wework/grabbit/gbus/tx"
)

//SagaStoreTableMigration creates the service saga store table
func SagaStoreTableMigration(svcName string) *migrator.Migration {
tblName := tx.GetSagatableName(svcName)
func sagaStoreTableMigration(svcName string) *migrator.Migration {
tblName := tx.GrabbitTableNameTemplate(svcName, "sagas")

createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
rec_id INT PRIMARY KEY AUTO_INCREMENT,
Expand All @@ -33,10 +31,10 @@ func SagaStoreTableMigration(svcName string) *migrator.Migration {
}
}

//OutboxMigrations creates service outbox table
func OutboxMigrations(svcName string) *migrator.Migration {
func outboxMigrations(svcName string) *migrator.Migration {

query := `CREATE TABLE IF NOT EXISTS ` + getOutboxName(svcName) + ` (
tblName := tx.GrabbitTableNameTemplate(svcName, "outbox")
query := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
rec_id int NOT NULL AUTO_INCREMENT,
message_id varchar(50) NOT NULL UNIQUE,
message_type varchar(50) NOT NULL,
Expand All @@ -62,8 +60,7 @@ func OutboxMigrations(svcName string) *migrator.Migration {
}
}

//TimoutTableMigration creates the service timeout table, where timeouts are persisted
func TimoutTableMigration(svcName string) *migrator.Migration {
func timoutTableMigration(svcName string) *migrator.Migration {
tblName := GetTimeoutsTableName(svcName)

createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
Expand All @@ -85,14 +82,31 @@ func TimoutTableMigration(svcName string) *migrator.Migration {
}
}

func legacyMigrationsTable(svcName string) *migrator.Migration {

query := `DROP TABLE IF EXISTS grabbitmigrations_` + sanitizeSvcName(svcName)

return &migrator.Migration{
Name: "drop legacy migrations table",
Func: func(tx *sql.Tx) error {
if _, err := tx.Exec(query); err != nil {
return err
}
return nil
},
}
}

//EnsureSchema implements Grabbit's migrations strategy
func EnsureSchema(db *sql.DB, svcName string) {
migrationsTable := sanitizedMigrationsTable(svcName)

migrate, err := migrator.New(migrator.TableName(migrationsTable), migrator.Migrations(
OutboxMigrations(svcName),
SagaStoreTableMigration(svcName),
TimoutTableMigration(svcName),
tblName := tx.GrabbitTableNameTemplate(svcName, "migrations")

migrate, err := migrator.New(migrator.TableName(tblName), migrator.Migrations(
outboxMigrations(svcName),
sagaStoreTableMigration(svcName),
timoutTableMigration(svcName),
legacyMigrationsTable(svcName),
))
if err != nil {
panic(err)
Expand All @@ -103,9 +117,8 @@ func EnsureSchema(db *sql.DB, svcName string) {
}
}

func sanitizedMigrationsTable(svcName string) string {
var re = regexp.MustCompile(`-|;|\\|`)
sanitized := re.ReplaceAllString(svcName, "")
func sanitizeSvcName(svcName string) string {

return strings.ToLower("grabbitMigrations_" + sanitized)
sanitized := tx.SanitizeTableName(svcName)
return strings.ToLower(sanitized)
}
10 changes: 0 additions & 10 deletions gbus/tx/mysql/sanitize.go

This file was deleted.

9 changes: 2 additions & 7 deletions gbus/tx/mysql/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package mysql

import (
"database/sql"
"regexp"
"strings"
"time"

"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/tx"
)

var _ gbus.TimeoutManager = &TimeoutManager{}
Expand Down Expand Up @@ -173,11 +172,7 @@ func (tm *TimeoutManager) SetTimeoutFunction(timeoutFunc func(tx *sql.Tx, sagaID

//GetTimeoutsTableName returns the table name in which to store timeouts
func GetTimeoutsTableName(svcName string) string {

var re = regexp.MustCompile(`-|;|\\|`)
sanitized := re.ReplaceAllString(svcName, "")

return strings.ToLower("grabbit_" + sanitized + "_timeouts")
return tx.GrabbitTableNameTemplate(svcName, "timeouts")
}

//NewTimeoutManager creates a new instance of a mysql based TimeoutManager
Expand Down
11 changes: 6 additions & 5 deletions gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"database/sql"
"encoding/gob"
"fmt"
"strconv"
"sync"
"time"

"github.com/rs/xid"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/wework/grabbit/gbus"
"strconv"
"strings"
"sync"
"time"
"github.com/wework/grabbit/gbus/tx"
)

var (
Expand Down Expand Up @@ -315,5 +316,5 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows,

func getOutboxName(svcName string) string {

return strings.ToLower("grabbit_" + sanitizeTableName(svcName) + "_outbox")
return tx.GrabbitTableNameTemplate(svcName, "outbox")
}
23 changes: 23 additions & 0 deletions gbus/tx/sanitize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package tx

import (
"fmt"
"regexp"
"strings"
)

//SanitizeTableName returns a sanitizes and lower cased string for creating a table
func SanitizeTableName(dirty string) string {

var re = regexp.MustCompile(`-|;|\\|`)
sanitized := re.ReplaceAllString(dirty, "")
return strings.ToLower(sanitized)
}

//GrabbitTableNameTemplate returns the tamplated grabbit table name for the table type and service
func GrabbitTableNameTemplate(svcName, table string) string {
sanitized := SanitizeTableName(svcName)
templated := fmt.Sprintf("grabbit_%s_%s", sanitized, table)
return strings.ToLower(templated)

}

0 comments on commit a916269

Please sign in to comment.