Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes mysql create indexes and adds complex mysql integration test #3345

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions backend/gen/go/db/dbschemas/mysql/system.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions backend/gen/go/db/dbschemas/postgresql/system.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 12 additions & 13 deletions backend/pkg/dbschemas/sql/mysql/queries/system.sql
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ ORDER BY


-- name: GetIndicesBySchemasAndTables :many
SELECT
SELECT
s.TABLE_SCHEMA as schema_name,
s.TABLE_NAME as table_name,
s.COLUMN_NAME as column_name,
Expand All @@ -220,23 +220,22 @@ SELECT
s.INDEX_TYPE as index_type,
s.SEQ_IN_INDEX as seq_in_index,
s.NULLABLE as nullable
FROM
INFORMATION_SCHEMA.STATISTICS s
LEFT JOIN
INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
ON s.TABLE_SCHEMA = kcu.CONSTRAINT_SCHEMA
AND s.TABLE_NAME = kcu.TABLE_NAME
AND s.COLUMN_NAME = kcu.COLUMN_NAME
WHERE
s.TABLE_SCHEMA = sqlc.arg('schema') AND s.TABLE_NAME in (sqlc.slice('tables'))
AND s.INDEX_NAME != 'PRIMARY'
AND kcu.CONSTRAINT_NAME IS NULL
ORDER BY
FROM information_schema.statistics s
LEFT JOIN information_schema.table_constraints tc
ON s.TABLE_SCHEMA = tc.TABLE_SCHEMA
AND s.TABLE_NAME = tc.TABLE_NAME
AND s.INDEX_NAME = tc.CONSTRAINT_NAME
WHERE
s.TABLE_SCHEMA = sqlc.arg('schema')
AND s.TABLE_NAME in (sqlc.slice('tables'))
AND tc.CONSTRAINT_NAME IS NULL -- filters out other constraints (foreign keys, unique, primary keys, etc)
ORDER BY
s.TABLE_NAME,
s.INDEX_NAME,
s.SEQ_IN_INDEX;



-- name: GetCustomFunctionsBySchemas :many
SELECT
ROUTINE_NAME as function_name,
Expand Down
60 changes: 43 additions & 17 deletions backend/pkg/sqlmanager/mysql/mysql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ func (m *MysqlManager) GetRolePermissionsMap(ctx context.Context) (map[string][]
return schemaTablePrivsMap, err
}

type indexInfo struct {
indexName string
indexType string
columns []string
}

func (m *MysqlManager) GetTableInitStatements(ctx context.Context, tables []*sqlmanager_shared.SchemaTable) ([]*sqlmanager_shared.TableInitStatement, error) {
if len(tables) == 0 {
return []*sqlmanager_shared.TableInitStatement{}, nil
Expand Down Expand Up @@ -351,7 +357,7 @@ func (m *MysqlManager) GetTableInitStatements(ctx context.Context, tables []*sql
})
}

indexmap := map[string]map[string][]string{}
indexmap := map[string]map[string]*indexInfo{}
var indexMapMu sync.Mutex
for schema, tables := range schemaset {
errgrp.Go(func() error {
Expand All @@ -368,17 +374,30 @@ func (m *MysqlManager) GetTableInitStatements(ctx context.Context, tables []*sql
for _, record := range idxrecords {
key := sqlmanager_shared.SchemaTable{Schema: record.SchemaName, Table: record.TableName}
if _, exists := indexmap[key.String()]; !exists {
indexmap[key.String()] = make(map[string][]string)
indexmap[key.String()] = make(map[string]*indexInfo)
}
// Group columns/expressions by index name
if record.ColumnName.Valid {
indexmap[key.String()][record.IndexName] = append(
indexmap[key.String()][record.IndexName],
if _, exists := indexmap[key.String()][record.IndexName]; !exists {
indexmap[key.String()][record.IndexName] = &indexInfo{
indexName: record.IndexName,
indexType: record.IndexType,
columns: []string{},
}
}
indexmap[key.String()][record.IndexName].columns = append(
indexmap[key.String()][record.IndexName].columns,
record.ColumnName.String,
)
} else if record.Expression.Valid {
indexmap[key.String()][record.IndexName] = append(
indexmap[key.String()][record.IndexName],
if _, exists := indexmap[key.String()][record.IndexName]; !exists {
indexmap[key.String()][record.IndexName] = &indexInfo{
indexName: record.IndexName,
indexType: record.IndexType,
columns: []string{},
}
}
indexmap[key.String()][record.IndexName].columns = append(
indexmap[key.String()][record.IndexName].columns,
// expressions must be wrapped in parentheses on creation, but don't come out of the DB in that format /shrug
fmt.Sprintf("(%s)", record.Expression.String),
)
Expand Down Expand Up @@ -451,10 +470,10 @@ func (m *MysqlManager) GetTableInitStatements(ctx context.Context, tables []*sql
info.AlterTableStatements = append(info.AlterTableStatements, stmt)
}
if tableIndices, ok := indexmap[key]; ok {
for idxName, cols := range tableIndices {
for _, idxInfo := range tableIndices {
info.IndexStatements = append(
info.IndexStatements,
wrapIdempotentIndex(schematable.Schema, schematable.Table, idxName, cols),
wrapIdempotentIndex(schematable.Schema, schematable.Table, idxInfo),
)
}
}
Expand Down Expand Up @@ -777,24 +796,31 @@ func hashInput(input ...string) string {
return hex.EncodeToString(hasher.Sum(nil))
}

func createIndexStmt(schema, table string, idxInfo *indexInfo, columnInput []string) string {
if strings.EqualFold(idxInfo.indexType, "spatial") || strings.EqualFold(idxInfo.indexType, "fulltext") {
return fmt.Sprintf("ALTER TABLE %s.%s ADD %s INDEX %s (%s);", EscapeMysqlColumn(schema), EscapeMysqlColumn(table), idxInfo.indexType, EscapeMysqlColumn(idxInfo.indexName), strings.Join(columnInput, ", "))
}
return fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX %s (%s) USING %s;", EscapeMysqlColumn(schema), EscapeMysqlColumn(table), EscapeMysqlColumn(idxInfo.indexName), strings.Join(columnInput, ", "), idxInfo.indexType)
}

func wrapIdempotentIndex(
schema,
table,
constraintname string,
cols []string,
table string,
idxInfo *indexInfo,
) string {
hashParams := []string{schema, table, constraintname}
hashParams = append(hashParams, cols...)
hashParams := []string{schema, table, idxInfo.indexName}
hashParams = append(hashParams, idxInfo.columns...)

columnInput := []string{}
for _, col := range cols {
for _, col := range idxInfo.columns {
if strings.HasPrefix(col, "(") {
columnInput = append(columnInput, col)
} else {
columnInput = append(columnInput, EscapeMysqlColumn(col))
}
}
procedureName := fmt.Sprintf("NeosyncAddIndex_%s", hashInput(hashParams...))[:64]
indexStmt := createIndexStmt(schema, table, idxInfo, columnInput)
stmt := fmt.Sprintf(`
CREATE PROCEDURE %[1]s()
BEGIN
Expand All @@ -807,13 +833,13 @@ BEGIN
AND index_name = '%[4]s';

IF index_exists = 0 THEN
CREATE INDEX %[5]s ON %[6]s.%[7]s(%[8]s);
%s
END IF;
END;

CALL %[1]s();
DROP PROCEDURE %[1]s;
`, procedureName, schema, table, constraintname, EscapeMysqlColumn(constraintname), EscapeMysqlColumn(schema), EscapeMysqlColumn(table), strings.Join(columnInput, ", "))
`, procedureName, schema, table, idxInfo.indexName, indexStmt)
return strings.TrimSpace(stmt)
}

Expand Down
79 changes: 79 additions & 0 deletions internal/integration-tests/worker/workflow/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
tcmysql "github.com/nucleuscloud/neosync/internal/testutil/testcontainers/mysql"
testutil_testdata "github.com/nucleuscloud/neosync/internal/testutil/testdata"
mysql_alltypes "github.com/nucleuscloud/neosync/internal/testutil/testdata/mysql/alltypes"
mysql_complex "github.com/nucleuscloud/neosync/internal/testutil/testdata/mysql/complex"
mysql_composite_keys "github.com/nucleuscloud/neosync/internal/testutil/testdata/mysql/composite-keys"
mysql_edgecases "github.com/nucleuscloud/neosync/internal/testutil/testdata/mysql/edgecases"
mysql_human_resources "github.com/nucleuscloud/neosync/internal/testutil/testdata/mysql/humanresources"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -413,6 +415,83 @@ func test_mysql_on_conflict_do_update(
require.NoError(t, err)
}

func test_mysql_complex(
t *testing.T,
ctx context.Context,
mysql *tcmysql.MysqlTestSyncContainer,
neosyncApi *tcneosyncapi.NeosyncApiTestClient,
dbManagers *TestDatabaseManagers,
accountId string,
sourceConn, destConn *mgmtv1alpha1.Connection,
) {
jobclient := neosyncApi.OSSUnauthenticatedLicensedClients.Jobs()
schema := "complex"

err := mysql.Source.RunCreateStmtsInDatabase(ctx, mysqlTestdataFolder, []string{"complex/create-tables.sql", "complex/inserts.sql"}, schema)
require.NoError(t, err)

neosyncApi.MockTemporalForCreateJob("test-mysql-sync")

mappings := mysql_complex.GetDefaultSyncJobMappings(schema)

job := createMysqlSyncJob(t, ctx, jobclient, &createJobConfig{
AccountId: accountId,
SourceConn: sourceConn,
DestConn: destConn,
JobName: "mysql_complex",
JobMappings: mappings,
JobOptions: &TestJobOptions{
Truncate: false,
InitSchema: true,
OnConflictDoUpdate: false,
},
})

testworkflow := NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers, WithMaxIterations(10), WithPageLimit(100))
testworkflow.RequireActivitiesCompletedSuccessfully(t)
testworkflow.ExecuteTestDataSyncWorkflow(job.GetId())
require.Truef(t, testworkflow.TestEnv.IsWorkflowCompleted(), "Workflow did not complete. Test: mysql_complex")
err = testworkflow.TestEnv.GetWorkflowError()
require.NoError(t, err, "Received Temporal Workflow Error: mysql_complex")

expectedResults := []struct {
schema string
table string
rowCount int
}{
{schema, "agency", 20},
{schema, "astronaut", 20},
{schema, "spacecraft", 20},
{schema, "celestial_body", 20},
{schema, "launch_site", 20},
{schema, "mission", 20},
{schema, "mission_crew", 20},
{schema, "research_project", 20},
{schema, "project_mission", 20},
{schema, "mission_log", 20},
{schema, "observatory", 20},
{schema, "telescope", 21},
{schema, "instrument", 20},
{schema, "observation_session", 20},
{schema, "data_set", 20},
{schema, "research_paper", 20},
{schema, "paper_citation", 20},
{schema, "grant", 20},
{schema, "grant_research_project", 20},
{schema, "instrument_usage", 20},
}

for _, expected := range expectedResults {
rowCount, err := mysql.Target.GetTableRowCount(ctx, expected.schema, expected.table)
require.NoError(t, err)
assert.Equalf(t, expected.rowCount, rowCount, fmt.Sprintf("Test: mysql_complex Table: %s", expected.table))
}

// tear down
err = cleanupMysqlDatabases(ctx, mysql, []string{schema})
require.NoError(t, err)
}

func cleanupMysqlDatabases(ctx context.Context, mysql *tcmysql.MysqlTestSyncContainer, databases []string) error {
errgrp, errctx := errgroup.WithContext(ctx)
errgrp.Go(func() error { return mysql.Source.DropDatabases(errctx, databases) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ func Test_Workflow(t *testing.T) {
test_mysql_on_conflict_do_update(t, ctx, mysql, neosyncApi, dbManagers, accountId, sourceConn, destConn)
})

t.Run("complex", func(t *testing.T) {
t.Parallel()
test_mysql_complex(t, ctx, mysql, neosyncApi, dbManagers, accountId, sourceConn, destConn)
})

t.Cleanup(func() {
err := mysql.TearDown(ctx)
if err != nil {
Expand Down
Loading