Skip to content

Commit fae5fdc

Browse files
Postgres - adds support for exclusions, partitioned tables and fixes constraint query (#3318)
1 parent 635477b commit fae5fdc

File tree

11 files changed

+247
-280
lines changed

11 files changed

+247
-280
lines changed

backend/gen/go/db/dbschemas/postgresql/querier.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/gen/go/db/dbschemas/postgresql/system.sql.go

+34-43
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/pkg/dbschemas/sql/postgresql/queries/system.sql

+28-37
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ column_defaults AS (
7474
n.nspname NOT IN('pg_catalog', 'pg_toast', 'information_schema')
7575
AND a.attnum > 0
7676
AND NOT a.attisdropped
77-
AND c.relkind = 'r'
77+
AND c.relkind IN ('r', 'p')
78+
-- exclude child partitions
79+
AND c.relispartition = FALSE
7880
),
7981
identity_columns AS (
8082
SELECT
@@ -204,7 +206,9 @@ column_defaults AS (
204206
(n.nspname || '.' || c.relname) = ANY(sqlc.arg('schematables')::TEXT[])
205207
AND a.attnum > 0
206208
AND NOT a.attisdropped
207-
AND c.relkind = 'r'
209+
AND c.relkind IN ('r', 'p')
210+
-- exclude child partitions
211+
AND c.relispartition = FALSE
208212
),
209213
identity_columns AS (
210214
SELECT
@@ -572,43 +576,30 @@ ORDER BY
572576

573577
-- name: GetNonForeignKeyTableConstraintsBySchema :many
574578
SELECT
575-
tc.constraint_name,
576-
tc.constraint_type::TEXT AS constraint_type,
577-
tc.table_schema AS schema_name,
578-
tc.table_name,
579-
/* Collect all columns associated with this constraint, if any */
580-
ARRAY_AGG(kcu.column_name ORDER BY kcu.ordinal_position)
581-
FILTER (WHERE kcu.column_name IS NOT NULL)::TEXT[] AS constraint_columns,
582-
/* Pull the actual definition from pg_get_constraintdef for completeness */
583-
pg_get_constraintdef(pgcon.oid)::TEXT AS constraint_definition
584-
FROM information_schema.table_constraints AS tc
585-
/* LEFT JOIN so that constraints without specific columns (e.g. some CHECKs) aren’t lost */
586-
LEFT JOIN information_schema.key_column_usage AS kcu
587-
ON tc.constraint_name = kcu.constraint_name
588-
AND tc.table_schema = kcu.table_schema
589-
AND tc.table_name = kcu.table_name
590-
591-
/* Map the info_schema schema name to pg_namespace for use in pg_constraint */
592-
JOIN pg_catalog.pg_namespace AS pn
593-
ON pn.nspname = tc.table_schema
594-
595-
/* Retrieve the constraint definition from pg_get_constraintdef() */
596-
JOIN pg_catalog.pg_constraint AS pgcon
597-
ON pgcon.conname = tc.constraint_name
598-
AND pgcon.connamespace = pn.oid
579+
pn.nspname AS schema_name,
580+
c.relname AS table_name,
581+
pgcon.conname AS constraint_name,
582+
pgcon.contype::TEXT AS constraint_type,
583+
-- Collect all columns associated with this constraint, if any
584+
ARRAY_AGG(kcu.column_name ORDER BY kcu.ordinal_position) FILTER (WHERE kcu.column_name IS NOT NULL)::TEXT [] AS constraint_columns,
585+
pg_get_constraintdef(pgcon.oid)::TEXT AS constraint_definition
586+
FROM
587+
pg_catalog.pg_constraint pgcon
588+
JOIN pg_catalog.pg_namespace pn ON pn.oid = pgcon.connamespace /* schema info */
589+
JOIN pg_catalog.pg_class c ON c.oid = pgcon.conrelid /* table info */
590+
LEFT JOIN information_schema.key_column_usage AS kcu ON pgcon.conname = kcu.constraint_name /* column info */
591+
AND pn.nspname = kcu.table_schema
592+
AND c.relname = kcu.table_name
599593
WHERE
600-
tc.table_schema = ANY(sqlc.arg('schema')::TEXT[])
601-
/* Exclude foreign keys */
602-
AND tc.constraint_type <> 'FOREIGN KEY'
594+
pn.nspname = ANY(sqlc.arg('schemas')::TEXT[])
595+
-- Exclude foreign keys
596+
AND pgcon.contype != 'f'
603597
GROUP BY
604-
tc.constraint_name,
605-
tc.constraint_type,
606-
tc.table_schema,
607-
tc.table_name,
608-
pgcon.oid
609-
ORDER BY
610-
tc.table_name,
611-
tc.constraint_name;
598+
pgcon.oid,
599+
pgcon.conname,
600+
pgcon.contype,
601+
pn.nspname,
602+
c.relname;
612603

613604
-- name: GetForeignKeyConstraintsBySchemas :many
614605
SELECT

backend/pkg/sqlmanager/postgres/postgres-manager.go

+3-17
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,12 @@ func (p *PostgresManager) GetTableConstraintsBySchema(ctx context.Context, schem
163163
for _, row := range nonFkConstraints {
164164
tableName := sqlmanager_shared.BuildTable(row.SchemaName, row.TableName)
165165
switch row.ConstraintType {
166-
case "PRIMARY KEY":
166+
case "p":
167167
if _, exists := primaryKeyMap[tableName]; !exists {
168168
primaryKeyMap[tableName] = []string{}
169169
}
170170
primaryKeyMap[tableName] = append(primaryKeyMap[tableName], sqlmanager_shared.DedupeSlice(row.ConstraintColumns)...)
171-
case "UNIQUE":
171+
case "u":
172172
columns := sqlmanager_shared.DedupeSlice(row.ConstraintColumns)
173173
uniqueConstraintsMap[tableName] = append(uniqueConstraintsMap[tableName], columns)
174174
}
@@ -543,7 +543,7 @@ func (p *PostgresManager) GetTableInitStatements(ctx context.Context, tables []*
543543
if err != nil {
544544
return nil, err
545545
}
546-
constraintType, err := ToConstraintType(constraint.ConstraintType)
546+
constraintType, err := sqlmanager_shared.ToConstraintType(constraint.ConstraintType)
547547
if err != nil {
548548
return nil, fmt.Errorf("failed to convert constraint type '%s': %w", constraint.ConstraintType, err)
549549
}
@@ -1091,17 +1091,3 @@ func GetPostgresColumnOverrideAndResetProperties(columnInfo *sqlmanager_shared.D
10911091

10921092
return
10931093
}
1094-
1095-
func ToConstraintType(constraintType string) (sqlmanager_shared.ConstraintType, error) {
1096-
switch constraintType {
1097-
case "PRIMARY KEY":
1098-
return sqlmanager_shared.PrimaryConstraintType, nil
1099-
case "UNIQUE":
1100-
return sqlmanager_shared.UniqueConstraintType, nil
1101-
case "FOREIGN KEY":
1102-
return sqlmanager_shared.ForeignConstraintType, nil
1103-
case "CHECK":
1104-
return sqlmanager_shared.CheckConstraintType, nil
1105-
}
1106-
return -1, errors.ErrUnsupported
1107-
}

backend/pkg/sqlmanager/shared/types.go

+3
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ const (
8484
ForeignConstraintType
8585
UniqueConstraintType
8686
CheckConstraintType
87+
ExclusionConstraintType
8788
)
8889

8990
func ToConstraintType(constraintType string) (ConstraintType, error) {
@@ -96,6 +97,8 @@ func ToConstraintType(constraintType string) (ConstraintType, error) {
9697
return ForeignConstraintType, nil
9798
case "c":
9899
return CheckConstraintType, nil
100+
case "x":
101+
return ExclusionConstraintType, nil
99102
}
100103
return -1, errors.ErrUnsupported
101104
}

internal/integration-tests/sqlmanager/postgres/testdata/setup.sql

+3-4
Original file line numberDiff line numberDiff line change
@@ -277,22 +277,21 @@ CREATE SEQUENCE flights_flight_id_seq
277277

278278
ALTER SEQUENCE flights_flight_id_seq OWNED BY flights.flight_id;
279279

280+
-- tests duplicate check constraint name
280281
CREATE TABLE seats (
281282
aircraft_code character(3) NOT NULL,
282283
seat_no character varying(4) NOT NULL,
283284
fare_conditions character varying(10) NOT NULL,
284-
CONSTRAINT seats_fare_conditions_check CHECK (((fare_conditions)::text = ANY (ARRAY[('Economy'::character varying)::text, ('Comfort'::character varying)::text, ('Business'::character varying)::text])))
285+
CONSTRAINT fare_conditions_check CHECK (((fare_conditions)::text = ANY (ARRAY[('Economy'::character varying)::text, ('Comfort'::character varying)::text, ('Business'::character varying)::text])))
285286
);
286287

287-
288-
289288
CREATE TABLE ticket_flights (
290289
ticket_no character(13) NOT NULL,
291290
flight_id integer NOT NULL,
292291
fare_conditions character varying(10) NOT NULL,
293292
amount numeric(10,2) NOT NULL,
294293
CONSTRAINT ticket_flights_amount_check CHECK ((amount >= (0)::numeric)),
295-
CONSTRAINT ticket_flights_fare_conditions_check CHECK (((fare_conditions)::text = ANY (ARRAY[('Economy'::character varying)::text, ('Comfort'::character varying)::text, ('Business'::character varying)::text])))
294+
CONSTRAINT fare_conditions_check CHECK (((fare_conditions)::text = ANY (ARRAY[('Economy'::character varying)::text, ('Comfort'::character varying)::text, ('Business'::character varying)::text])))
296295
);
297296

298297

internal/integration-tests/worker/workflow/postgres_test.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -1041,9 +1041,6 @@ func test_postgres_complex(
10411041
folder := testdataFolder + "/complex"
10421042
err := postgres.Source.RunSqlFiles(ctx, &folder, []string{"create-tables.sql", "inserts.sql"})
10431043
require.NoError(t, err)
1044-
// should use init schema but currently broken
1045-
err = postgres.Target.RunSqlFiles(ctx, &folder, []string{"create-tables.sql"})
1046-
require.NoError(t, err)
10471044

10481045
jobmappings := pg_complex.GetDefaultSyncJobMappings()
10491046

@@ -1059,7 +1056,7 @@ func test_postgres_complex(
10591056
JobOptions: &TestJobOptions{
10601057
Truncate: true,
10611058
TruncateCascade: true,
1062-
InitSchema: false,
1059+
InitSchema: true,
10631060
SubsetByForeignKeyConstraints: true,
10641061
},
10651062
})
@@ -1109,7 +1106,7 @@ func test_postgres_complex(
11091106
// {schema: "space_mission", table: "system_events", rowCount: 3},
11101107
// {schema: "space_mission", table: "astronaut_events", rowCount: 3},
11111108
// {schema: "space_mission", table: "mission_events", rowCount: 3},
1112-
// {schema: "space_mission", table: "telemetry", rowCount: 6},
1109+
{schema: "space_mission", table: "telemetry", rowCount: 6},
11131110
{schema: "space_mission", table: "comments", rowCount: 4},
11141111
{schema: "space_mission", table: "tags", rowCount: 4},
11151112
{schema: "space_mission", table: "taggables", rowCount: 4},
@@ -1153,7 +1150,7 @@ func test_postgres_complex(
11531150
JobOptions: &TestJobOptions{
11541151
Truncate: true,
11551152
TruncateCascade: true,
1156-
InitSchema: false,
1153+
InitSchema: true,
11571154
SubsetByForeignKeyConstraints: true,
11581155
},
11591156
})
@@ -1203,7 +1200,7 @@ func test_postgres_complex(
12031200
// {schema: "space_mission", table: "system_events", rowCount: 3},
12041201
// {schema: "space_mission", table: "astronaut_events", rowCount: 3},
12051202
// {schema: "space_mission", table: "mission_events", rowCount: 3},
1206-
// {schema: "space_mission", table: "telemetry", rowCount: 6},
1203+
{schema: "space_mission", table: "telemetry", rowCount: 6},
12071204
{schema: "space_mission", table: "comments", rowCount: 4},
12081205
{schema: "space_mission", table: "tags", rowCount: 4},
12091206
{schema: "space_mission", table: "taggables", rowCount: 4},

internal/testutil/testcontainers/postgres/postgres.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ func (p *PostgresTestContainer) DropSchemas(ctx context.Context, schemas []strin
317317
}
318318

319319
func (p *PostgresTestContainer) GetTableRowCount(ctx context.Context, schema, table string) (int, error) {
320-
rows := p.DB.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %q.%q;", schema, table))
320+
rows := p.DB.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM ONLY %q.%q;", schema, table))
321321
var count int
322322
err := rows.Scan(&count)
323323
if err != nil {

0 commit comments

Comments
 (0)