Skip to content

Commit

Permalink
Merge pull request #144 from twinguy/main
Browse files Browse the repository at this point in the history
feat: implement central db migrations
  • Loading branch information
diyor28 authored Feb 24, 2025
2 parents 45f7396 + f540d05 commit 51d672d
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 72 deletions.
1 change: 1 addition & 0 deletions modules/core/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (m *Module) Register(app application.Application) error {
controllers.NewUploadController(app),
controllers.NewUsersController(app),
controllers.NewRolesController(app),
controllers.NewEmployeeController(app),

Check failure on line 63 in modules/core/module.go

View workflow job for this annotation

GitHub Actions / test

undefined: controllers.NewEmployeeController

Check failure on line 63 in modules/core/module.go

View workflow job for this annotation

GitHub Actions / test

undefined: controllers.NewEmployeeController
)
app.RegisterHashFsAssets(assets.HashFS)
app.RegisterGraphSchema(application.GraphSchema{
Expand Down
1 change: 1 addition & 0 deletions modules/warehouse/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package warehouse

import (
"embed"

icons "github.com/iota-uz/icons/phosphor"
"github.com/iota-uz/iota-sdk/modules/warehouse/infrastructure/persistence"
"github.com/iota-uz/iota-sdk/modules/warehouse/interfaces/graph"
Expand Down
3 changes: 3 additions & 0 deletions pkg/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,21 @@ func CollectMigrations(app *application) ([]*migrate.Migration, error) {
if err != nil {
return nil, err
}

for _, file := range files {
content, err := migrationFs.ReadFile(file)
if err != nil {
return nil, err
}

migration, err := migrate.ParseMigration(filepath.Join(file), bytes.NewReader(content))
if err != nil {
return nil, err
}
migrations = append(migrations, migration)
}
}

return migrations, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/application/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package application
import (
"context"
"embed"

"github.com/iota-uz/iota-sdk/modules/core/domain/entities/permission"
"github.com/iota-uz/iota-sdk/pkg/spotlight"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down
20 changes: 9 additions & 11 deletions pkg/commands/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ import (
"context"
"errors"
"fmt"
"log"
"os"
"runtime/debug"
"time"

"github.com/iota-uz/iota-sdk/modules"
"github.com/iota-uz/iota-sdk/pkg/application"
"github.com/iota-uz/iota-sdk/pkg/configuration"
"github.com/iota-uz/iota-sdk/pkg/eventbus"
"github.com/iota-uz/iota-sdk/pkg/logging"
"github.com/iota-uz/iota-sdk/pkg/schema/ast"
"github.com/iota-uz/iota-sdk/pkg/schema/collector"
"github.com/iota-uz/iota-sdk/pkg/schema/diff"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/sirupsen/logrus"
)
Expand All @@ -36,14 +36,6 @@ func ensureDirectories() error {
}

func Migrate(mods ...application.Module) error {
defer func() {
if r := recover(); r != nil {
configuration.Use().Unload()
debug.PrintStack()
os.Exit(1)
}
}()

if len(os.Args) < 2 {
return ErrNoCommand
}
Expand All @@ -52,6 +44,12 @@ func Migrate(mods ...application.Module) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

logFile, logger, err := logging.FileLogger(conf.LogrusLogLevel())
if err != nil {
log.Fatalf("failed to create logger: %v", err)
}
defer logFile.Close()

if err := ensureDirectories(); err != nil {
return err
}
Expand All @@ -60,7 +58,7 @@ func Migrate(mods ...application.Module) error {

switch command {
case "collect":
return handleSchemaCommands(ctx, command, conf.LogrusLogLevel())
return handleSchemaCommands(ctx, command, logger.Level)
default:
return handleMigrationCommands(ctx, command, conf, mods...)
}
Expand Down
29 changes: 24 additions & 5 deletions pkg/schema/ast/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
alterTablePattern = regexp.MustCompile(`(?is)ALTER\s+TABLE\s+([^\s]+)\s+(.*)`)
constraintPattern = regexp.MustCompile(`(?i)^\s*(CONSTRAINT\s+\w+\s+|PRIMARY\s+KEY|FOREIGN\s+KEY|UNIQUE)\s*(.*)$`)
createIndexPattern = regexp.MustCompile(`(?is)CREATE\s+(?:UNIQUE\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?([^\s]+)\s+ON\s+([^\s(]+)\s*\((.*)\)`)
referencesPattern = regexp.MustCompile(`(?i)REFERENCES\s+([^\s(]+)\s*(?:\(([^)]+)\))?`)
)

func (p *Parser) parseCreateTable(stmt string) (*types.Node, error) {
Expand Down Expand Up @@ -145,18 +146,36 @@ TypeFound:
constraints = strings.TrimSpace(def[typeEnd:])
}

// Extract REFERENCES from constraints if present
var references string
var referencedTable string
var referencedColumns string

if constraints != "" {
if matches := referencesPattern.FindStringSubmatch(constraints); matches != nil {
referencedTable = strings.Trim(matches[1], `"'`)
if len(matches) > 2 {
referencedColumns = matches[2]
}
references = matches[0]
}
}

// Build full definition
fullDef := strings.TrimSpace(fmt.Sprintf("%s %s %s", colName, dataType, constraints))

return &types.Node{
Type: types.NodeColumn,
Name: colName,
Metadata: map[string]interface{}{
"type": strings.Split(dataType, "(")[0],
"fullType": dataType,
"definition": fullDef,
"rawType": def,
"constraints": constraints,
"type": strings.Split(dataType, "(")[0],
"fullType": dataType,
"definition": fullDef,
"rawType": def,
"constraints": constraints,
"references": references,
"referenced_table": referencedTable,
"referenced_cols": referencedColumns,
},
}
}
Expand Down
45 changes: 40 additions & 5 deletions pkg/schema/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func New(cfg Config) *Collector {
logger := cfg.Logger
if logger == nil {
logger = logrus.New()
logger.SetLevel(cfg.LogLevel)
// Default log level to INFO if not configured
if cfg.LogLevel == 0 {
cfg.LogLevel = logrus.InfoLevel
}
// logger.SetLevel(cfg.LogLevel)
} else {
logger.SetLevel(cfg.LogLevel)
}
Expand Down Expand Up @@ -476,6 +480,7 @@ func (c *Collector) loadModuleSchema() (*types.SchemaTree, error) {
// Track processed tables and indexes to avoid duplicates
processedTables := make(map[string]bool)
processedIndexes := make(map[string]bool)
droppedTables := make(map[string]bool) // Track tables that should be dropped

err := filepath.Walk(c.modulesDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
Expand All @@ -495,12 +500,35 @@ func (c *Collector) loadModuleSchema() (*types.SchemaTree, error) {
return nil
}

// Log found tables, columns, and indexes
// First pass: collect DROP TABLE statements
statements := strings.Split(sqlContent, ";")
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if strings.HasPrefix(strings.ToUpper(stmt), "DROP TABLE") {
// Extract table name from DROP TABLE statement
parts := strings.Fields(stmt)
if len(parts) >= 3 {
tableName := strings.ToLower(strings.TrimRight(parts[2], " \t\n\r;"))
tableName = strings.TrimPrefix(tableName, "IF EXISTS ")
tableName = strings.TrimSuffix(tableName, "CASCADE")
tableName = strings.TrimSpace(tableName)
droppedTables[tableName] = true
c.logger.Debugf("Marked table for dropping: %s", tableName)
}
}
}

// Second pass: process CREATE and ALTER statements
for _, node := range parsed.Root.Children {
switch node.Type {
case types.NodeTable:
tableName := strings.ToLower(node.Name)
c.logger.Debugf("Found table: %s with %d columns", node.Name, len(node.Children))

// Skip if table is marked for dropping
if droppedTables[tableName] {
c.logger.Debugf("Skipping dropped table: %s", tableName)
continue
}

// Skip if we've already processed this table
if processedTables[tableName] {
Expand All @@ -509,6 +537,7 @@ func (c *Collector) loadModuleSchema() (*types.SchemaTree, error) {
}
processedTables[tableName] = true

c.logger.Debugf("Found table: %s with %d columns", node.Name, len(node.Children))
for _, col := range node.Children {
if col.Type == types.NodeColumn {
c.logger.Debugf(" Column: %s, Type: %s, Constraints: %s",
Expand All @@ -524,7 +553,13 @@ func (c *Collector) loadModuleSchema() (*types.SchemaTree, error) {

case types.NodeIndex:
indexName := strings.ToLower(node.Name)
c.logger.Debugf("Found index: %s on table %s", node.Name, node.Metadata["table"])
tableName := strings.ToLower(node.Metadata["table"].(string))

// Skip if parent table is marked for dropping
if droppedTables[tableName] {
c.logger.Debugf("Skipping index for dropped table: %s", indexName)
continue
}

// Skip if we've already processed this index
if processedIndexes[indexName] {
Expand All @@ -533,7 +568,7 @@ func (c *Collector) loadModuleSchema() (*types.SchemaTree, error) {
}
processedIndexes[indexName] = true

// Add index to tree
c.logger.Debugf("Found index: %s on table %s", node.Name, node.Metadata["table"])
tree.Root.Children = append(tree.Root.Children, node)
c.logger.Debugf("Added index %s from %s", node.Name, path)
}
Expand Down
Loading

0 comments on commit 51d672d

Please sign in to comment.