diff --git a/backend/.sqlx/query-011c7638eeeda710deb86a216a9e10df9c3e9458e85bcdde466b01011a1f2ac2.json b/backend/.sqlx/query-011c7638eeeda710deb86a216a9e10df9c3e9458e85bcdde466b01011a1f2ac2.json new file mode 100644 index 0000000000000..749b684fd97a0 --- /dev/null +++ b/backend/.sqlx/query-011c7638eeeda710deb86a216a9e10df9c3e9458e85bcdde466b01011a1f2ac2.json @@ -0,0 +1,50 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n path,\n is_flow,\n workspace_id,\n owner,\n email,\n trigger_config as \"trigger_config!: _\"\n FROM\n capture_config\n WHERE\n trigger_kind = 'postgres' AND\n last_client_ping > NOW() - INTERVAL '10 seconds' AND\n trigger_config IS NOT NULL AND\n (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "path", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "is_flow", + "type_info": "Bool" + }, + { + "ordinal": 2, + "name": "workspace_id", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "owner", + "type_info": "Varchar" + }, + { + "ordinal": 4, + "name": "email", + "type_info": "Varchar" + }, + { + "ordinal": 5, + "name": "trigger_config!: _", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false, + false, + true + ] + }, + "hash": "011c7638eeeda710deb86a216a9e10df9c3e9458e85bcdde466b01011a1f2ac2" +} diff --git a/backend/.sqlx/query-07da723ce5c9ee2d7c236e8eabe254c783fc34b617c8a9a95a0eb0cda535dab5.json b/backend/.sqlx/query-07da723ce5c9ee2d7c236e8eabe254c783fc34b617c8a9a95a0eb0cda535dab5.json index a3bc7c1ef6d63..f7685bf7eb3df 100644 --- a/backend/.sqlx/query-07da723ce5c9ee2d7c236e8eabe254c783fc34b617c8a9a95a0eb0cda535dab5.json +++ b/backend/.sqlx/query-07da723ce5c9ee2d7c236e8eabe254c783fc34b617c8a9a95a0eb0cda535dab5.json @@ -18,7 +18,8 @@ "websocket", "kafka", "email", - "nats" + "nats", + "postgres" ] } } diff --git a/backend/.sqlx/query-12e868b63a7c622c76713db5a5577a927efca4ae49a15c2b999e2410f2a312ff.json b/backend/.sqlx/query-12e868b63a7c622c76713db5a5577a927efca4ae49a15c2b999e2410f2a312ff.json new file mode 100644 index 0000000000000..49ad10ac078a8 --- /dev/null +++ b/backend/.sqlx/query-12e868b63a7c622c76713db5a5577a927efca4ae49a15c2b999e2410f2a312ff.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE \n capture_config \n SET \n last_server_ping = NULL \n WHERE \n workspace_id = $1 AND \n path = $2 AND \n is_flow = $3 AND \n trigger_kind = 'postgres' AND \n server_id IS NULL\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "12e868b63a7c622c76713db5a5577a927efca4ae49a15c2b999e2410f2a312ff" +} diff --git a/backend/.sqlx/query-1488e1b5007752e1ebae4235ad04c398fe6398745e16fd119008b8ea67662416.json b/backend/.sqlx/query-1488e1b5007752e1ebae4235ad04c398fe6398745e16fd119008b8ea67662416.json new file mode 100644 index 0000000000000..197298f788619 --- /dev/null +++ b/backend/.sqlx/query-1488e1b5007752e1ebae4235ad04c398fe6398745e16fd119008b8ea67662416.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE postgres_trigger \n SET \n server_id = $1, \n last_server_ping = now(),\n error = 'Connecting...'\n WHERE \n enabled IS TRUE \n AND workspace_id = $2 \n AND path = $3 \n AND (last_server_ping IS NULL \n OR last_server_ping < now() - INTERVAL '15 seconds'\n ) \n RETURNING true\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Text", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "1488e1b5007752e1ebae4235ad04c398fe6398745e16fd119008b8ea67662416" +} diff --git a/backend/.sqlx/query-1974bd65bbf40024773aad4dee1c50b12e110e76bb58e6de25bec094e758a71c.json b/backend/.sqlx/query-1974bd65bbf40024773aad4dee1c50b12e110e76bb58e6de25bec094e758a71c.json new file mode 100644 index 0000000000000..f19670704db94 --- /dev/null +++ b/backend/.sqlx/query-1974bd65bbf40024773aad4dee1c50b12e110e76bb58e6de25bec094e758a71c.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE \n capture_config \n SET \n last_server_ping = now(), \n error = $1 \n WHERE \n workspace_id = $2 AND \n path = $3 AND \n is_flow = $4 AND \n trigger_kind = 'postgres' AND \n server_id = $5 AND \n last_client_ping > NOW() - INTERVAL '10 seconds' \n RETURNING 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Bool", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "1974bd65bbf40024773aad4dee1c50b12e110e76bb58e6de25bec094e758a71c" +} diff --git a/backend/.sqlx/query-29f096ec62c4abb1435a5667e2b30e9c1724e419cdc23ef1b300e84c02a20427.json b/backend/.sqlx/query-29f096ec62c4abb1435a5667e2b30e9c1724e419cdc23ef1b300e84c02a20427.json new file mode 100644 index 0000000000000..b7075f9df1e9e --- /dev/null +++ b/backend/.sqlx/query-29f096ec62c4abb1435a5667e2b30e9c1724e419cdc23ef1b300e84c02a20427.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE \n capture_config \n SET \n error = $1, \n server_id = NULL, \n last_server_ping = NULL \n WHERE \n workspace_id = $2 AND \n path = $3 AND \n is_flow = $4 AND \n trigger_kind = 'postgres'\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "29f096ec62c4abb1435a5667e2b30e9c1724e419cdc23ef1b300e84c02a20427" +} diff --git a/backend/.sqlx/query-5c1de8473e0e96c1063a9a735a064c5a91e3ed8d9260c72b783fc12542b88fbd.json b/backend/.sqlx/query-5c1de8473e0e96c1063a9a735a064c5a91e3ed8d9260c72b783fc12542b88fbd.json index 890a282ad1524..b6461d711fbcd 100644 --- a/backend/.sqlx/query-5c1de8473e0e96c1063a9a735a064c5a91e3ed8d9260c72b783fc12542b88fbd.json +++ b/backend/.sqlx/query-5c1de8473e0e96c1063a9a735a064c5a91e3ed8d9260c72b783fc12542b88fbd.json @@ -26,7 +26,8 @@ "websocket", "kafka", "email", - "nats" + "nats", + "postgres" ] } } @@ -58,7 +59,8 @@ "websocket", "kafka", "email", - "nats" + "nats", + "postgres" ] } } diff --git a/backend/.sqlx/query-61bed1bc6d3e6a3c1d640eeacc290a85d8b63ee36c39dfbf4348d120f6e561ae.json b/backend/.sqlx/query-61bed1bc6d3e6a3c1d640eeacc290a85d8b63ee36c39dfbf4348d120f6e561ae.json new file mode 100644 index 0000000000000..7f9886ccb603e --- /dev/null +++ b/backend/.sqlx/query-61bed1bc6d3e6a3c1d640eeacc290a85d8b63ee36c39dfbf4348d120f6e561ae.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE \n postgres_trigger \n SET\n last_server_ping = NULL \n WHERE \n workspace_id = $1 \n AND path = $2 \n AND server_id IS NULL", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "61bed1bc6d3e6a3c1d640eeacc290a85d8b63ee36c39dfbf4348d120f6e561ae" +} diff --git a/backend/.sqlx/query-62475252dcf54f32433b97ae011daf5d4205d160d2aedf463c7dfe944e93257a.json b/backend/.sqlx/query-62475252dcf54f32433b97ae011daf5d4205d160d2aedf463c7dfe944e93257a.json index 5c32a3af00fb5..d99d288b08bf1 100644 --- a/backend/.sqlx/query-62475252dcf54f32433b97ae011daf5d4205d160d2aedf463c7dfe944e93257a.json +++ b/backend/.sqlx/query-62475252dcf54f32433b97ae011daf5d4205d160d2aedf463c7dfe944e93257a.json @@ -18,7 +18,8 @@ "websocket", "kafka", "email", - "nats" + "nats", + "postgres" ] } } diff --git a/backend/.sqlx/query-6f56acb985aa7141ea1891d7ad58a32c35d1b02fe7070c92a2e62c1a5339c396.json b/backend/.sqlx/query-6f56acb985aa7141ea1891d7ad58a32c35d1b02fe7070c92a2e62c1a5339c396.json new file mode 100644 index 0000000000000..f3bda55eecb51 --- /dev/null +++ b/backend/.sqlx/query-6f56acb985aa7141ea1891d7ad58a32c35d1b02fe7070c92a2e62c1a5339c396.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT slot_name FROM pg_replication_slots where slot_name = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "slot_name", + "type_info": "Name" + } + ], + "parameters": { + "Left": [ + "Name" + ] + }, + "nullable": [ + true + ] + }, + "hash": "6f56acb985aa7141ea1891d7ad58a32c35d1b02fe7070c92a2e62c1a5339c396" +} diff --git a/backend/.sqlx/query-71d51bbc35da7b9930e3ea3a634451217ccb9f1bc35b1ad6e10d16bc19c41447.json b/backend/.sqlx/query-71d51bbc35da7b9930e3ea3a634451217ccb9f1bc35b1ad6e10d16bc19c41447.json index a8dcf9e0118d5..1a6121909670d 100644 --- a/backend/.sqlx/query-71d51bbc35da7b9930e3ea3a634451217ccb9f1bc35b1ad6e10d16bc19c41447.json +++ b/backend/.sqlx/query-71d51bbc35da7b9930e3ea3a634451217ccb9f1bc35b1ad6e10d16bc19c41447.json @@ -29,7 +29,8 @@ "websocket", "kafka", "email", - "nats" + "nats", + "postgres" ] } } diff --git a/backend/.sqlx/query-74d928f4c3f0de191f414471b9a4fbe9c20f9685b06ad5bbded424948b2dc88c.json b/backend/.sqlx/query-74d928f4c3f0de191f414471b9a4fbe9c20f9685b06ad5bbded424948b2dc88c.json new file mode 100644 index 0000000000000..0f72e41b9fa69 --- /dev/null +++ b/backend/.sqlx/query-74d928f4c3f0de191f414471b9a4fbe9c20f9685b06ad5bbded424948b2dc88c.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE \n capture_config \n SET \n server_id = $1,\n last_server_ping = now(), \n error = 'Connecting...' \n WHERE \n last_client_ping > NOW() - INTERVAL '10 seconds' AND \n workspace_id = $2 AND \n path = $3 AND \n is_flow = $4 AND \n trigger_kind = 'postgres' AND \n (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') \n RETURNING true\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Text", + "Text", + "Bool" + ] + }, + "nullable": [ + null + ] + }, + "hash": "74d928f4c3f0de191f414471b9a4fbe9c20f9685b06ad5bbded424948b2dc88c" +} diff --git a/backend/.sqlx/query-9116102c6ccad5b0d752d5d690c233dfe48062aef23072b4f4ae4ab5ca269082.json b/backend/.sqlx/query-9116102c6ccad5b0d752d5d690c233dfe48062aef23072b4f4ae4ab5ca269082.json new file mode 100644 index 0000000000000..6e58a79dd356f --- /dev/null +++ b/backend/.sqlx/query-9116102c6ccad5b0d752d5d690c233dfe48062aef23072b4f4ae4ab5ca269082.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE \n postgres_trigger\n SET \n last_server_ping = now(),\n error = $1\n WHERE\n workspace_id = $2\n AND path = $3\n AND server_id = $4 \n AND enabled IS TRUE\n RETURNING 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "9116102c6ccad5b0d752d5d690c233dfe48062aef23072b4f4ae4ab5ca269082" +} diff --git a/backend/.sqlx/query-baa1dddc616419bf4b923715f0a863bc0ff69c98db0f0c8f55e4ac89fdde7a60.json b/backend/.sqlx/query-baa1dddc616419bf4b923715f0a863bc0ff69c98db0f0c8f55e4ac89fdde7a60.json new file mode 100644 index 0000000000000..e772354ea85f4 --- /dev/null +++ b/backend/.sqlx/query-baa1dddc616419bf4b923715f0a863bc0ff69c98db0f0c8f55e4ac89fdde7a60.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT pubname FROM pg_publication WHERE pubname = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "pubname", + "type_info": "Name" + } + ], + "parameters": { + "Left": [ + "Name" + ] + }, + "nullable": [ + false + ] + }, + "hash": "baa1dddc616419bf4b923715f0a863bc0ff69c98db0f0c8f55e4ac89fdde7a60" +} diff --git a/backend/.sqlx/query-bfc534d87d701d7ac78cc97d0054d829165ba3f22fba75c3161e4cddb72264ee.json b/backend/.sqlx/query-bfc534d87d701d7ac78cc97d0054d829165ba3f22fba75c3161e4cddb72264ee.json new file mode 100644 index 0000000000000..76f0f1486fecd --- /dev/null +++ b/backend/.sqlx/query-bfc534d87d701d7ac78cc97d0054d829165ba3f22fba75c3161e4cddb72264ee.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE \n postgres_trigger \n SET \n enabled = FALSE, \n error = $1, \n server_id = NULL, \n last_server_ping = NULL \n WHERE \n workspace_id = $2 AND \n path = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "bfc534d87d701d7ac78cc97d0054d829165ba3f22fba75c3161e4cddb72264ee" +} diff --git a/backend/.sqlx/query-c223f8b7fa4ef1aa06e1ba2a56d677774aa237508d5610714efd2e9b8b93c7b8.json b/backend/.sqlx/query-c223f8b7fa4ef1aa06e1ba2a56d677774aa237508d5610714efd2e9b8b93c7b8.json index 4d79556b49215..b5c5aa2cb7b03 100644 --- a/backend/.sqlx/query-c223f8b7fa4ef1aa06e1ba2a56d677774aa237508d5610714efd2e9b8b93c7b8.json +++ b/backend/.sqlx/query-c223f8b7fa4ef1aa06e1ba2a56d677774aa237508d5610714efd2e9b8b93c7b8.json @@ -21,7 +21,8 @@ "websocket", "kafka", "email", - "nats" + "nats", + "postgres" ] } } diff --git a/backend/.sqlx/query-c5270ee815689e42b65df507b850da43239c9a5aaea41c9aed7ed33a6219a534.json b/backend/.sqlx/query-c5270ee815689e42b65df507b850da43239c9a5aaea41c9aed7ed33a6219a534.json index b7ac98ff249a1..6f08506fdfa0c 100644 --- a/backend/.sqlx/query-c5270ee815689e42b65df507b850da43239c9a5aaea41c9aed7ed33a6219a534.json +++ b/backend/.sqlx/query-c5270ee815689e42b65df507b850da43239c9a5aaea41c9aed7ed33a6219a534.json @@ -18,7 +18,8 @@ "websocket", "kafka", "email", - "nats" + "nats", + "postgres" ] } } diff --git a/backend/.sqlx/query-e17ec84003e2ec414622d100f5dfdda86bee33f31835317df512a20c805b35d7.json b/backend/.sqlx/query-e17ec84003e2ec414622d100f5dfdda86bee33f31835317df512a20c805b35d7.json index 2ef165828ec74..7e7155a6b4595 100644 --- a/backend/.sqlx/query-e17ec84003e2ec414622d100f5dfdda86bee33f31835317df512a20c805b35d7.json +++ b/backend/.sqlx/query-e17ec84003e2ec414622d100f5dfdda86bee33f31835317df512a20c805b35d7.json @@ -26,7 +26,8 @@ "websocket", "kafka", "email", - "nats" + "nats", + "postgres" ] } } diff --git a/backend/.sqlx/query-e23e110e1f0438d21534fc4323e0e7bc1f0dbeca2e4f44ced05bae0ca5ca1039.json b/backend/.sqlx/query-e23e110e1f0438d21534fc4323e0e7bc1f0dbeca2e4f44ced05bae0ca5ca1039.json index 950d7662ad1d4..488355d34ebbb 100644 --- a/backend/.sqlx/query-e23e110e1f0438d21534fc4323e0e7bc1f0dbeca2e4f44ced05bae0ca5ca1039.json +++ b/backend/.sqlx/query-e23e110e1f0438d21534fc4323e0e7bc1f0dbeca2e4f44ced05bae0ca5ca1039.json @@ -34,7 +34,8 @@ "websocket", "kafka", "email", - "nats" + "nats", + "postgres" ] } } diff --git a/backend/.sqlx/query-fd5754fe3c6346ae28818a9d60d144a40f8884f47e5bbdd2824e939dafd8f154.json b/backend/.sqlx/query-fd5754fe3c6346ae28818a9d60d144a40f8884f47e5bbdd2824e939dafd8f154.json new file mode 100644 index 0000000000000..e797ab88052cb --- /dev/null +++ b/backend/.sqlx/query-fd5754fe3c6346ae28818a9d60d144a40f8884f47e5bbdd2824e939dafd8f154.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n schemaname AS schema_name,\n tablename AS table_name,\n CASE\n WHEN array_length(attnames, 1) = (SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = pg_publication_tables.schemaname AND table_name = pg_publication_tables.tablename)\n THEN NULL\n ELSE attnames\n END AS columns,\n rowfilter AS where_clause\n FROM\n pg_publication_tables\n WHERE\n pubname = $1;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "schema_name", + "type_info": "Name" + }, + { + "ordinal": 1, + "name": "table_name", + "type_info": "Name" + }, + { + "ordinal": 2, + "name": "columns", + "type_info": "NameArray" + }, + { + "ordinal": 3, + "name": "where_clause", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Name" + ] + }, + "nullable": [ + true, + true, + null, + true + ] + }, + "hash": "fd5754fe3c6346ae28818a9d60d144a40f8884f47e5bbdd2824e939dafd8f154" +} diff --git a/backend/migrations/20250204192651_add_postgres_type_value_to_trigger_kind_type.down.sql b/backend/migrations/20250204192651_add_postgres_type_value_to_trigger_kind_type.down.sql new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/backend/migrations/20250204192651_add_postgres_type_value_to_trigger_kind_type.up.sql b/backend/migrations/20250204192651_add_postgres_type_value_to_trigger_kind_type.up.sql new file mode 100644 index 0000000000000..35c37f75cea20 --- /dev/null +++ b/backend/migrations/20250204192651_add_postgres_type_value_to_trigger_kind_type.up.sql @@ -0,0 +1,2 @@ +-- Add up migration script here +ALTER TYPE TRIGGER_KIND ADD VALUE IF NOT EXISTS 'postgres'; \ No newline at end of file diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 9f2ece7d4b32d..a71090433446b 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -8760,6 +8760,34 @@ paths: schema: type: string + /w/{workspace}/postgres_triggers/test: + post: + summary: test postgres connection + operationId: testPostgresConnection + tags: + - postgres_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + requestBody: + description: test postgres connection + required: true + content: + application/json: + schema: + type: object + properties: + database: + type: string + required: + - database + responses: + "200": + description: successfuly connected to postgres + content: + text/plain: + schema: + type: string + /groups/list: get: summary: list instance groups @@ -13323,7 +13351,7 @@ components: type: string required: - transaction_to_track - + TableToTrack: type: array items: @@ -14644,7 +14672,7 @@ components: CaptureTriggerKind: type: string - enum: [webhook, http, websocket, kafka, email, nats] + enum: [webhook, http, websocket, kafka, email, nats, postgres] Capture: type: object diff --git a/backend/windmill-api/src/capture.rs b/backend/windmill-api/src/capture.rs index 4b933b01b4225..664f874c0c5bb 100644 --- a/backend/windmill-api/src/capture.rs +++ b/backend/windmill-api/src/capture.rs @@ -38,6 +38,16 @@ use crate::http_triggers::{build_http_trigger_extra, HttpMethod}; use crate::kafka_triggers_ee::KafkaTriggerConfigConnection; #[cfg(all(feature = "enterprise", feature = "nats"))] use crate::nats_triggers_ee::NatsTriggerConfigConnection; +#[cfg(feature = "postgres_trigger")] +use crate::postgres_triggers::{ + create_logical_replication_slot_query, create_publication_query, drop_publication_query, + generate_random_string, get_database_connection, PublicationData, +}; +#[cfg(feature = "postgres_trigger")] +use itertools::Itertools; +#[cfg(feature = "postgres_trigger")] +use pg_escape::quote_literal; + use crate::{ args::WebhookArgs, db::{ApiAuthed, DB}, @@ -78,7 +88,7 @@ pub fn workspaced_unauthed_service() -> Router { } } -#[derive(sqlx::Type, Serialize, Deserialize)] +#[derive(sqlx::Type, Serialize, Deserialize, Debug)] #[sqlx(type_name = "TRIGGER_KIND", rename_all = "lowercase")] #[serde(rename_all = "lowercase")] pub enum TriggerKind { @@ -88,6 +98,7 @@ pub enum TriggerKind { Kafka, Email, Nats, + Postgres, } impl fmt::Display for TriggerKind { @@ -99,6 +110,7 @@ impl fmt::Display for TriggerKind { TriggerKind::Kafka => "kafka", TriggerKind::Email => "email", TriggerKind::Nats => "nats", + TriggerKind::Postgres => "postgres", }; write!(f, "{}", s) } @@ -133,6 +145,16 @@ pub struct NatsTriggerConfig { pub use_jetstream: bool, } +#[cfg(feature = "postgres_trigger")] +#[derive(Serialize, Deserialize, Debug)] +pub struct PostgresTriggerConfig { + pub postgres_resource_path: String, + pub publication_name: Option, + pub replication_slot_name: Option, + pub publication: PublicationData, +} + +#[cfg(feature = "websocket")] #[derive(Serialize, Deserialize, Debug)] pub struct WebsocketTriggerConfig { pub url: String, @@ -145,6 +167,9 @@ pub struct WebsocketTriggerConfig { enum TriggerConfig { #[cfg(feature = "http_trigger")] Http(HttpTriggerConfig), + #[cfg(feature = "postgres_trigger")] + Postgres(PostgresTriggerConfig), + #[cfg(feature = "websocket")] Websocket(WebsocketTriggerConfig), #[cfg(all(feature = "enterprise", feature = "kafka"))] Kafka(KafkaTriggerConfig), @@ -186,18 +211,88 @@ async fn get_configs( ) .fetch_all(&mut *tx) .await?; - tx.commit().await?; Ok(Json(configs)) } +#[cfg(feature = "postgres_trigger")] +async fn set_postgres_trigger_config( + w_id: &str, + authed: ApiAuthed, + db: &DB, + user_db: UserDB, + mut capture_config: NewCaptureConfig, +) -> Result { + let Some(TriggerConfig::Postgres(mut postgres_config)) = capture_config.trigger_config else { + return Err(windmill_common::error::Error::BadRequest( + "Invalid postgres config".to_string(), + )); + }; + + let mut connection = get_database_connection( + authed, + Some(user_db), + &db, + &postgres_config.postgres_resource_path, + &w_id, + ) + .await?; + + let publication_name = postgres_config + .publication_name + .get_or_insert(format!("windmill_capture_{}", generate_random_string())); + let replication_slot_name = postgres_config + .replication_slot_name + .get_or_insert(publication_name.clone()); + + let query = drop_publication_query(&publication_name); + + sqlx::query(&query).execute(&mut connection).await?; + + let query = create_publication_query( + &publication_name, + postgres_config.publication.table_to_track.as_deref(), + &postgres_config + .publication + .transaction_to_track + .iter() + .map(AsRef::as_ref) + .collect_vec(), + ); + + sqlx::query(&query).execute(&mut connection).await?; + + let query = format!( + "SELECT 1 from pg_replication_slots WHERE slot_name = {}", + quote_literal(replication_slot_name) + ); + + let row = sqlx::query(&query).fetch_optional(&mut connection).await?; + + if row.is_none() { + let query = create_logical_replication_slot_query(&replication_slot_name); + sqlx::query(&query).execute(&mut connection).await?; + } + capture_config.trigger_config = Some(TriggerConfig::Postgres(postgres_config)); + Ok(capture_config) +} + async fn set_config( authed: ApiAuthed, Extension(user_db): Extension, + #[cfg(feature = "postgres_trigger")] Extension(db): Extension, Path(w_id): Path, Json(nc): Json, ) -> Result<()> { + #[cfg(feature = "postgres_trigger")] + let nc = if let TriggerKind::Postgres = nc.trigger_kind { + set_postgres_trigger_config(&w_id, authed.clone(), &db, user_db.clone(), nc).await? + } + else { + nc + }; + let mut tx = user_db.begin(&authed).await?; sqlx::query!( diff --git a/backend/windmill-api/src/http_triggers.rs b/backend/windmill-api/src/http_triggers.rs index f09451c3af207..0d6be258e1659 100644 --- a/backend/windmill-api/src/http_triggers.rs +++ b/backend/windmill-api/src/http_triggers.rs @@ -75,7 +75,7 @@ pub fn workspaced_service() -> Router { .route("/route_exists", post(exists_route)) } -#[derive(Serialize, Deserialize, sqlx::Type)] +#[derive(Serialize, Deserialize, sqlx::Type, Debug)] #[sqlx(type_name = "HTTP_METHOD", rename_all = "lowercase")] #[serde(rename_all = "lowercase")] pub enum HttpMethod { diff --git a/backend/windmill-api/src/postgres_triggers/handler.rs b/backend/windmill-api/src/postgres_triggers/handler.rs index dcf350bd917db..9c37e0968f391 100644 --- a/backend/windmill-api/src/postgres_triggers/handler.rs +++ b/backend/windmill-api/src/postgres_triggers/handler.rs @@ -1,9 +1,6 @@ -use std::{ - collections::{ - hash_map::Entry::{Occupied, Vacant}, - HashMap, - }, - str::FromStr, +use std::collections::{ + hash_map::Entry::{Occupied, Vacant}, + HashMap, }; use crate::{ @@ -14,29 +11,28 @@ use axum::{ extract::{Path, Query}, Extension, Json, }; -use chrono::Utc; use http::StatusCode; use itertools::Itertools; use pg_escape::{quote_identifier, quote_literal}; use quick_cache::sync::Cache; -use rand::Rng; use rust_postgres::types::Type; use serde::{Deserialize, Deserializer, Serialize}; use sql_builder::{bind::Bind, SqlBuilder}; -use sqlx::{ - postgres::{types::Oid, PgConnectOptions, PgSslMode}, - Connection, FromRow, PgConnection, QueryBuilder, -}; +use sqlx::{postgres::types::Oid, FromRow, PgConnection}; use windmill_audit::{audit_ee::audit_log, ActionKind}; use windmill_common::error::Error; use windmill_common::{ db::UserDB, - error::{self, JsonResult}, + error::{self, JsonResult, Result}, utils::{not_found_if_none, paginate, Pagination, StripPath}, worker::CLOUD_HOSTED, }; -use super::get_database_resource; +use super::{ + create_logical_replication_slot_query, create_publication_query, + drop_logical_replication_slot_query, drop_publication_query, generate_random_string, + get_database_connection, ERROR_PUBLICATION_NAME_NOT_EXISTS, ERROR_REPLICATION_SLOT_NOT_EXISTS, +}; use lazy_static::lazy_static; #[derive(FromRow, Serialize, Deserialize, Debug)] @@ -88,7 +84,7 @@ impl Relations { } } -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] pub struct EditPostgresTrigger { replication_slot_name: String, publication_name: String, @@ -112,48 +108,39 @@ pub struct NewPostgresTrigger { publication: Option, } -pub async fn get_database_connection( - authed: ApiAuthed, - user_db: Option, - db: &DB, - postgres_resource_path: &str, - w_id: &str, -) -> Result { - let database = get_database_resource(authed, user_db, db, postgres_resource_path, w_id).await?; - - Ok(get_raw_postgres_connection(&database).await?) +#[derive(Serialize, Deserialize)] +pub struct TestPostgres { + pub postgres_resource_path: String, } -pub async fn get_raw_postgres_connection(db: &Database) -> Result { - let options = { - let sslmode = if !db.sslmode.is_empty() { - PgSslMode::from_str(&db.sslmode)? - } else { - PgSslMode::Prefer - }; - let options = PgConnectOptions::new() - .host(&db.host) - .database(&db.dbname) - .port(db.port) - .ssl_mode(sslmode) - .username(&db.user); - - let options = if !db.root_certificate_pem.is_empty() { - options.ssl_root_cert_from_pem(db.root_certificate_pem.as_bytes().to_vec()) - } else { - options - }; - - if !db.password.is_empty() { - options.password(&db.password) - } else { - options - } - }; - PgConnection::connect_with(&options) +pub async fn test_postgres_connection( + authed: ApiAuthed, + Extension(db): Extension, + Extension(user_db): Extension, + Path(workspace_id): Path, + Json(test_postgres): Json, +) -> Result<()> { + let connect_f = async { + get_database_connection( + authed, + Some(user_db), + &db, + &test_postgres.postgres_resource_path, + &workspace_id, + ) .await - .map_err(|e| e.into()) + .map_err(|err| { + error::Error::BadConfig(format!("Error connecting to postgres: {}", err.to_string())) + }) + }; + tokio::time::timeout(tokio::time::Duration::from_secs(30), connect_f) + .await + .map_err(|_| { + error::Error::BadConfig(format!("Timeout connecting to postgres after 30 seconds")) + })??; + + Ok(()) } #[derive(Deserialize, Debug)] @@ -165,19 +152,20 @@ pub enum Language { #[derive(Debug, Deserialize)] pub struct TemplateScript { postgres_resource_path: String, - #[serde(deserialize_with = "check_if_not_duplication_relation")] + #[serde(deserialize_with = "check_if_valid_relation")] relations: Option>, language: Language, } -fn check_if_not_duplication_relation<'de, D>( +fn check_if_valid_relation<'de, D>( relations: D, ) -> std::result::Result>, D::Error> where D: Deserializer<'de>, { let relations: Option> = Option::deserialize(relations)?; - + let mut track_all_table_in_schema = false; + let mut track_specific_columns_in_table = false; match relations { Some(relations) => { for relation in relations.iter() { @@ -187,12 +175,25 @@ where )); } + if !track_all_table_in_schema && relation.table_to_track.is_empty() { + track_all_table_in_schema = true; + continue; + } + for table_to_track in relation.table_to_track.iter() { if table_to_track.table_name.trim().is_empty() { return Err(serde::de::Error::custom( "Table name must not be empty".to_string(), )); } + + if !track_specific_columns_in_table && !table_to_track.columns_name.is_empty() { + track_specific_columns_in_table = true; + } + } + + if track_all_table_in_schema && track_specific_columns_in_table { + return Err(serde::de::Error::custom("Incompatible tracking options. Schema-level tracking and specific table tracking with column selection cannot be used together. Refer to the documentation for valid configurations.")); } } @@ -249,13 +250,105 @@ pub struct SetEnabled { pub enabled: bool, } +#[derive(Serialize, Deserialize)] +pub struct PostgresPublicationReplication { + publication_name: String, + replication_slot_name: String, +} + +impl PostgresPublicationReplication { + pub fn new( + publication_name: String, + replication_slot_name: String, + ) -> PostgresPublicationReplication { + PostgresPublicationReplication { publication_name, replication_slot_name } + } +} + +async fn check_if_publication_exist( + connection: &mut PgConnection, + publication_name: &str, +) -> Result<()> { + sqlx::query!( + "SELECT pubname FROM pg_publication WHERE pubname = $1", + publication_name + ) + .fetch_one(connection) + .await + .map_err(|err| match err { + sqlx::Error::RowNotFound => { + Error::BadRequest(ERROR_PUBLICATION_NAME_NOT_EXISTS.to_string()) + } + err => Error::SqlErr { error: err, location: "pg_trigger".to_string() }, + })?; + Ok(()) +} + +async fn check_if_logical_replication_slot_exist( + connection: &mut PgConnection, + replication_slot_name: &str, +) -> Result<()> { + sqlx::query!( + "SELECT slot_name FROM pg_replication_slots where slot_name = $1", + &replication_slot_name + ) + .fetch_one(connection) + .await + .map_err(|err| match err { + _ => Error::BadRequest(ERROR_REPLICATION_SLOT_NOT_EXISTS.to_string()), + })?; + Ok(()) +} + +async fn create_custom_slot_and_publication_inner( + authed: ApiAuthed, + user_db: UserDB, + db: &DB, + postgres_resource_path: &str, + w_id: &str, + publication: &PublicationData, +) -> Result { + let publication_name = format!("windmill_trigger_{}", generate_random_string()); + let replication_slot_name = publication_name.clone(); + + let query = create_publication_query( + &publication_name, + publication.table_to_track.as_deref(), + &publication + .transaction_to_track + .iter() + .map(AsRef::as_ref) + .collect_vec(), + ); + + let mut connection = get_database_connection( + authed.clone(), + Some(user_db.clone()), + &db, + &postgres_resource_path, + &w_id, + ) + .await?; + + sqlx::query(&query).execute(&mut connection).await?; + + let query = create_logical_replication_slot_query(&replication_slot_name); + + sqlx::query(&query).execute(&mut connection).await?; + + Ok(PostgresPublicationReplication::new( + publication_name, + replication_slot_name, + )) +} + pub async fn create_postgres_trigger( authed: ApiAuthed, Extension(user_db): Extension, Extension(db): Extension, Path(w_id): Path, Json(new_postgres_trigger): Json, -) -> error::Result<(StatusCode, String)> { +) -> Result<(StatusCode, String)> { if *CLOUD_HOSTED { return Err(error::Error::BadRequest( "Postgres triggers are not supported on multi-tenant cloud, use dedicated cloud or self-host".to_string(), @@ -278,59 +371,32 @@ pub async fn create_postgres_trigger( "Publication data is missing".to_string(), )); } - - let create_slot = replication_slot_name.is_none(); - let create_publication = publication_name.is_none(); - - let name; - let mut pub_name = publication_name.as_deref().unwrap_or_default(); - let mut slot_name = replication_slot_name.as_deref().unwrap_or_default(); - if create_publication || create_slot { - let generate_random_string = move || { - let timestamp = Utc::now().timestamp_millis().to_string(); - let mut rng = rand::rng(); - let charset = "abcdefghijklmnopqrstuvwxyz0123456789"; - - let random_part = (0..10) - .map(|_| { - charset - .chars() - .nth(rng.random_range(0..charset.len())) - .unwrap() - }) - .collect::(); - - format!("{}_{}", timestamp, random_part) - }; - - name = format!("windmill_{}", generate_random_string()); - pub_name = &name; - slot_name = &name; - let publication = publication.unwrap(); - - let mut connection = get_database_connection( - authed.clone(), - Some(user_db.clone()), - &db, - &postgres_resource_path, - &w_id, - ) - .await?; - - new_publication( - &mut connection, - pub_name, - publication.table_to_track.as_deref(), - &publication - .transaction_to_track - .iter() - .map(AsRef::as_ref) - .collect_vec(), - ) - .await?; - - new_slot(&mut connection, slot_name).await?; - } + let (pub_name, slot_name) = if publication_name.is_none() && replication_slot_name.is_none() { + if publication.is_none() { + return Err(Error::BadRequest("publication must be set".to_string())); + } + let PostgresPublicationReplication { publication_name, replication_slot_name } = + create_custom_slot_and_publication_inner( + authed.clone(), + user_db.clone(), + &db, + &postgres_resource_path, + &w_id, + &publication.unwrap(), + ) + .await?; + + (publication_name, replication_slot_name) + } else { + if publication_name.is_none() { + return Err(Error::BadRequest("Missing publication name".to_string())); + } else if replication_slot_name.is_none() { + return Err(Error::BadRequest( + "Missing replication slot name".to_string(), + )); + } + (replication_slot_name.unwrap(), publication_name.unwrap()) + }; let mut tx = user_db.begin(&authed).await?; @@ -450,10 +516,10 @@ pub async fn list_postgres_triggers( #[derive(Deserialize, Serialize, Debug)] pub struct PublicationData { - #[serde(default, deserialize_with = "check_if_not_duplication_relation")] - table_to_track: Option>, + #[serde(default, deserialize_with = "check_if_valid_relation")] + pub table_to_track: Option>, #[serde(deserialize_with = "check_if_valid_transaction_type")] - transaction_to_track: Vec, + pub transaction_to_track: Vec, } fn check_if_valid_transaction_type<'de, D>( @@ -509,7 +575,7 @@ pub async fn list_slot_name( Extension(user_db): Extension, Extension(db): Extension, Path((w_id, postgres_resource_path)): Path<(String, String)>, -) -> error::Result>> { +) -> Result>> { let mut connection = get_database_connection( authed.clone(), Some(user_db.clone()), @@ -543,28 +609,13 @@ pub struct Slot { name: String, } -async fn new_slot(connection: &mut PgConnection, name: &str) -> error::Result<()> { - let query = format!( - r#" - SELECT - * - FROM - pg_create_logical_replication_slot({}, 'pgoutput');"#, - quote_literal(&name) - ); - - sqlx::query(&query).execute(connection).await?; - - Ok(()) -} - pub async fn create_slot( authed: ApiAuthed, Extension(user_db): Extension, Extension(db): Extension, Path((w_id, postgres_resource_path)): Path<(String, String)>, Json(Slot { name }): Json, -) -> error::Result { +) -> Result { let mut connection = get_database_connection( authed.clone(), Some(user_db.clone()), @@ -574,7 +625,9 @@ pub async fn create_slot( ) .await?; - new_slot(&mut connection, &name).await?; + let query = create_logical_replication_slot_query(&name); + + sqlx::query(&query).execute(&mut connection).await?; Ok(format!("Slot {} created!", name)) } @@ -585,7 +638,7 @@ pub async fn drop_slot_name( Extension(db): Extension, Path((w_id, postgres_resource_path)): Path<(String, String)>, Json(Slot { name }): Json, -) -> error::Result { +) -> Result { let mut connection = get_database_connection( authed.clone(), Some(user_db.clone()), @@ -595,7 +648,7 @@ pub async fn drop_slot_name( ) .await?; - let query = format!("SELECT pg_drop_replication_slot({});", quote_literal(&name)); + let query = drop_logical_replication_slot_query(&name); sqlx::query(&query).execute(&mut connection).await?; Ok(format!("Slot name {} deleted!", name)) @@ -610,7 +663,7 @@ pub async fn list_database_publication( Extension(user_db): Extension, Extension(db): Extension, Path((w_id, postgres_resource_path)): Path<(String, String)>, -) -> error::Result>> { +) -> Result>> { let mut connection = get_database_connection( authed.clone(), Some(user_db.clone()), @@ -640,7 +693,7 @@ pub async fn get_publication_info( Extension(user_db): Extension, Extension(db): Extension, Path((w_id, publication_name, postgres_resource_path)): Path<(String, String, String)>, -) -> error::Result> { +) -> Result> { let mut connection = get_database_connection( authed.clone(), Some(user_db.clone()), @@ -651,13 +704,13 @@ pub async fn get_publication_info( .await?; let publication_data = - get_publication_scope_and_transaction(&publication_name, &mut connection).await; + get_publication_scope_and_transaction(&mut connection, &publication_name).await; - let (all_table, transaction_to_track) = match publication_data { + let (all_table, transaction_to_track) = match publication_data { Ok(pub_data) => pub_data, Err(Error::SqlErr { error: sqlx::Error::RowNotFound, .. }) => { return Err(Error::NotFound( - "Publication was not found, please create a new publication".to_string(), + ERROR_PUBLICATION_NAME_NOT_EXISTS.to_string(), )) } Err(e) => return Err(e), @@ -674,82 +727,13 @@ pub async fn get_publication_info( ))) } -async fn new_publication( - connection: &mut PgConnection, - publication_name: &str, - table_to_track: Option<&[Relations]>, - transaction_to_track: &[&str], -) -> Result<(), Error> { - let mut query = QueryBuilder::new("CREATE PUBLICATION "); - - query.push(quote_identifier(publication_name)); - - match table_to_track { - Some(database_component) if !database_component.is_empty() => { - query.push(" FOR"); - for (i, schema) in database_component.iter().enumerate() { - if schema.table_to_track.is_empty() { - query.push(" TABLES IN SCHEMA "); - query.push(quote_identifier(&schema.schema_name)); - } else { - query.push(" TABLE ONLY "); - for (j, table) in schema.table_to_track.iter().enumerate() { - let table_name = quote_identifier(&table.table_name); - let schema_name = quote_identifier(&schema.schema_name); - let full_name = format!("{}.{}", &schema_name, &table_name); - query.push(full_name); - if !table.columns_name.is_empty() { - query.push(" ("); - let columns = table - .columns_name - .iter() - .map(|column| quote_identifier(column)) - .join(", "); - query.push(&columns); - query.push(")"); - } - - if let Some(where_clause) = &table.where_clause { - query.push(" WHERE ("); - query.push(where_clause); - query.push(')'); - } - - if j + 1 != schema.table_to_track.len() { - query.push(", "); - } - } - } - if i < database_component.len() - 1 { - query.push(", "); - } - } - } - _ => { - query.push(" FOR ALL TABLES "); - } - }; - - if !transaction_to_track.is_empty() { - let transactions = || transaction_to_track.iter().join(", "); - query.push(" WITH (publish = '"); - query.push(transactions()); - query.push("');"); - } - - let query = query.build(); - query.execute(&mut *connection).await?; - - Ok(()) -} - pub async fn create_publication( authed: ApiAuthed, Extension(user_db): Extension, Extension(db): Extension, Path((w_id, publication_name, postgres_resource_path)): Path<(String, String, String)>, Json(publication_data): Json, -) -> error::Result { +) -> Result { let PublicationData { table_to_track, transaction_to_track } = publication_data; let mut connection = get_database_connection( @@ -761,13 +745,13 @@ pub async fn create_publication( ) .await?; - new_publication( - &mut connection, + let query = create_publication_query( &publication_name, table_to_track.as_deref(), &transaction_to_track.iter().map(AsRef::as_ref).collect_vec(), - ) - .await?; + ); + + sqlx::query(&query).execute(&mut connection).await?; Ok(format!( "Publication {} successfully created!", @@ -775,24 +759,12 @@ pub async fn create_publication( )) } -async fn drop_publication( - publication_name: &str, - connection: &mut PgConnection, -) -> Result<(), Error> { - let mut query = QueryBuilder::new("DROP PUBLICATION IF EXISTS "); - let quoted_publication_name = quote_identifier(publication_name); - query.push(quoted_publication_name); - query.push(";"); - query.build().execute(&mut *connection).await?; - Ok(()) -} - pub async fn delete_publication( authed: ApiAuthed, Extension(user_db): Extension, Extension(db): Extension, Path((w_id, publication_name, postgres_resource_path)): Path<(String, String, String)>, -) -> error::Result { +) -> Result { let mut connection = get_database_connection( authed.clone(), Some(user_db.clone()), @@ -802,7 +774,9 @@ pub async fn delete_publication( ) .await?; - drop_publication(&publication_name, &mut connection).await?; + let query = drop_publication_query(&publication_name); + + sqlx::query(&query).execute(&mut connection).await?; Ok(format!( "Publication {} successfully deleted!", @@ -810,65 +784,61 @@ pub async fn delete_publication( )) } -async fn update_publication( - connection: &mut PgConnection, +pub fn get_update_publication_query( publication_name: &str, PublicationData { table_to_track, transaction_to_track }: PublicationData, -) -> error::Result { - let (all_table, _) = - get_publication_scope_and_transaction(&publication_name, connection).await?; - - let mut query = QueryBuilder::new(""); + all_table: bool, +) -> Vec { let quoted_publication_name = quote_identifier(&publication_name); let transaction_to_track_as_str = transaction_to_track.iter().join(","); - + let mut queries = Vec::with_capacity(2); match table_to_track { Some(ref relations) if !relations.is_empty() => { if all_table { - drop_publication(&publication_name, connection).await?; - new_publication( - connection, + queries.push(drop_publication_query(&publication_name)); + queries.push(create_publication_query( &publication_name, table_to_track.as_deref(), &transaction_to_track.iter().map(AsRef::as_ref).collect_vec(), - ) - .await?; + )); } else { - query.push("ALTER PUBLICATION "); - query.push("ed_publication_name); - query.push(" SET"); + let mut query = String::from(""); + + query.push_str("ALTER PUBLICATION "); + query.push_str("ed_publication_name); + query.push_str(" SET"); for (i, schema) in relations.iter().enumerate() { if schema.table_to_track.is_empty() { - query.push(" TABLES IN SCHEMA "); + query.push_str(" TABLES IN SCHEMA "); let quoted_schema = quote_identifier(&schema.schema_name); - query.push("ed_schema); + query.push_str("ed_schema); } else { - query.push(" TABLE ONLY "); + query.push_str(" TABLE ONLY "); for (j, table) in schema.table_to_track.iter().enumerate() { let table_name = quote_identifier(&table.table_name); let schema_name = quote_identifier(&schema.schema_name); let full_name = format!("{}.{}", &schema_name, &table_name); - query.push(&full_name); + query.push_str(&full_name); if !table.columns_name.is_empty() { - query.push(" ("); + query.push_str(" ("); let columns = table .columns_name .iter() .map(|column| quote_identifier(column)) .join(", "); - query.push(&columns); - query.push(") "); + query.push_str(&columns); + query.push_str(") "); } if let Some(where_clause) = &table.where_clause { - query.push(" WHERE ("); - query.push(where_clause); + query.push_str(" WHERE ("); + query.push_str(where_clause); query.push(')'); } if j + 1 != schema.table_to_track.len() { - query.push(", "); + query.push_str(", "); } } } @@ -876,36 +846,35 @@ async fn update_publication( query.push(','); } } - query.push(";"); - query.build().execute(&mut *connection).await?; - query.reset(); - query.push("ALTER PUBLICATION "); - query.push("ed_publication_name); - query.push(format!( + query.push(';'); + + queries.push(query); + + let mut query = String::new(); + + query.push_str("ALTER PUBLICATION "); + query.push_str("ed_publication_name); + query.push_str(&format!( " SET (publish = '{}');", transaction_to_track_as_str )); + queries.push(query); } } _ => { - drop_publication(&publication_name, connection).await?; + queries.push(drop_publication_query(&publication_name)); let to_execute = format!( r#" CREATE - PUBLICATION {} FOR ALL TABLES WITH (publish = '{}') + PUBLICATION {} FOR ALL TABLES WITH (publish = '{}'); "#, quoted_publication_name, transaction_to_track_as_str ); - query.push(&to_execute); + queries.push(to_execute); } }; - query.build().execute(&mut *connection).await?; - - Ok(format!( - "Publication {} successfully updated!", - publication_name - )) + queries } pub async fn alter_publication( @@ -914,7 +883,7 @@ pub async fn alter_publication( Extension(db): Extension, Path((w_id, publication_name, postgres_resource_path)): Path<(String, String, String)>, Json(publication_data): Json, -) -> error::Result { +) -> Result { let mut connection = get_database_connection( authed.clone(), Some(user_db.clone()), @@ -923,15 +892,28 @@ pub async fn alter_publication( &w_id, ) .await?; - let message = update_publication(&mut connection, &publication_name, publication_data).await?; - Ok(message) + check_if_publication_exist(&mut connection, &publication_name).await?; + + let (all_table, _) = + get_publication_scope_and_transaction(&mut connection, &publication_name).await?; + + let queries = get_update_publication_query(&publication_name, publication_data, all_table); + + for query in queries { + sqlx::query(&query).execute(&mut connection).await?; + } + + Ok(format!( + "Publication {} updated with success", + publication_name + )) } async fn get_publication_scope_and_transaction( - publication_name: &str, connection: &mut PgConnection, -) -> Result<(bool, Vec), Error> { + publication_name: &str, +) -> std::result::Result<(bool, Vec), Error> { #[derive(Debug, Deserialize, FromRow)] struct PublicationTransaction { all_table: bool, @@ -976,7 +958,7 @@ async fn get_publication_scope_and_transaction( async fn get_tracked_relations( connection: &mut PgConnection, publication_name: &str, -) -> error::Result> { +) -> Result> { #[derive(Debug, Deserialize, FromRow)] struct PublicationData { schema_name: Option, @@ -989,14 +971,18 @@ async fn get_tracked_relations( PublicationData, r#" SELECT - schemaname AS schema_name, - tablename AS table_name, - attnames AS columns, - rowfilter AS where_clause + schemaname AS schema_name, + tablename AS table_name, + CASE + WHEN array_length(attnames, 1) = (SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = pg_publication_tables.schemaname AND table_name = pg_publication_tables.tablename) + THEN NULL + ELSE attnames + END AS columns, + rowfilter AS where_clause FROM pg_publication_tables WHERE - pubname = $1 + pubname = $1; "#, publication_name ) @@ -1011,7 +997,7 @@ async fn get_tracked_relations( let table_to_track = TableToTrack::new( publication.table_name.unwrap(), publication.where_clause, - publication.columns.unwrap(), + publication.columns.unwrap_or_default(), ); match entry { Occupied(mut occuped) => { @@ -1075,8 +1061,9 @@ pub async fn update_postgres_trigger( Extension(db): Extension, Path((w_id, path)): Path<(String, StripPath)>, Json(postgres_trigger): Json, -) -> error::Result { +) -> Result { let workspace_path = path.to_path(); + let EditPostgresTrigger { replication_slot_name, publication_name, @@ -1087,16 +1074,26 @@ pub async fn update_postgres_trigger( publication, } = postgres_trigger; + let mut connection = get_database_connection( + authed.clone(), + Some(user_db.clone()), + &db, + &postgres_resource_path, + &w_id, + ) + .await?; + + check_if_logical_replication_slot_exist(&mut connection, &replication_slot_name).await?; + if let Some(publication) = publication { - let mut connection = get_database_connection( - authed.clone(), - Some(user_db.clone()), - &db, - &postgres_resource_path, - &w_id, - ) - .await?; - update_publication(&mut connection, &publication_name, publication).await?; + check_if_publication_exist(&mut connection, &publication_name).await?; + let (all_table, _) = + get_publication_scope_and_transaction(&mut connection, &publication_name).await?; + + let queries = get_update_publication_query(&publication_name, publication, all_table); + for query in queries { + sqlx::query(&query).execute(&mut connection).await?; + } } let mut tx = user_db.begin(&authed).await?; @@ -1153,7 +1150,7 @@ pub async fn delete_postgres_trigger( authed: ApiAuthed, Extension(user_db): Extension, Path((w_id, path)): Path<(String, StripPath)>, -) -> error::Result { +) -> Result { let path = path.to_path(); let mut tx = user_db.begin(&authed).await?; sqlx::query!( @@ -1213,7 +1210,7 @@ pub async fn set_enabled( Extension(user_db): Extension, Path((w_id, path)): Path<(String, StripPath)>, Json(payload): Json, -) -> error::Result { +) -> Result { let mut tx = user_db.begin(&authed).await?; let path = path.to_path(); @@ -1264,7 +1261,7 @@ pub async fn set_enabled( )) } -pub async fn get_template_script(Path((_, id)): Path<(String, String)>) -> error::Result { +pub async fn get_template_script(Path((_, id)): Path<(String, String)>) -> Result { let template = if let Some((_, template)) = TEMPLATE.remove(&id) { template } else { @@ -1279,7 +1276,7 @@ pub async fn create_template_script( Extension(db): Extension, Path(w_id): Path, Json(template_script): Json, -) -> error::Result { +) -> Result { let TemplateScript { postgres_resource_path, relations, language } = template_script; if relations.is_none() { return Err(Error::BadRequest( @@ -1460,4 +1457,4 @@ pub async fn is_database_in_logical_level( }; Ok(Json(is_logical)) -} +} \ No newline at end of file diff --git a/backend/windmill-api/src/postgres_triggers/mod.rs b/backend/windmill-api/src/postgres_triggers/mod.rs index 12d7b558e0117..f45a8d5b372f5 100644 --- a/backend/windmill-api/src/postgres_triggers/mod.rs +++ b/backend/windmill-api/src/postgres_triggers/mod.rs @@ -4,8 +4,17 @@ use crate::{ resources::get_resource_value_interpolated_internal, users::fetch_api_authed, }; +use chrono::Utc; +use itertools::Itertools; +use pg_escape::{quote_identifier, quote_literal}; +use rand::Rng; use serde_json::value::RawValue; +use sqlx::{ + postgres::{PgConnectOptions, PgSslMode}, + Connection, PgConnection, +}; use std::collections::HashMap; +use std::str::FromStr; use axum::{ routing::{delete, get, post}, @@ -16,8 +25,10 @@ use handler::{ create_template_script, delete_postgres_trigger, delete_publication, drop_slot_name, exists_postgres_trigger, get_postgres_trigger, get_publication_info, get_template_script, is_database_in_logical_level, list_database_publication, list_postgres_triggers, - list_slot_name, set_enabled, update_postgres_trigger, Database, + list_slot_name, set_enabled, test_postgres_connection, update_postgres_trigger, Database, + Relations, }; +pub use handler::PostgresTrigger; use windmill_common::{db::UserDB, error::Error, utils::StripPath}; use windmill_queue::PushArgsOwned; @@ -30,9 +41,167 @@ mod relation; mod replication_message; mod trigger; -pub use handler::PostgresTrigger; +pub use handler::PublicationData; pub use trigger::start_database; +const ERROR_REPLICATION_SLOT_NOT_EXISTS: &str = r#"The replication slot associated with this trigger no longer exists. Recreate a new replication slot or select an existing one in the advanced tab, or delete and recreate a new trigger"#; + +const ERROR_PUBLICATION_NAME_NOT_EXISTS: &str = r#"The publication associated with this trigger no longer exists. Recreate a new publication or select an existing one in the advanced tab, or delete and recreate a new trigger"#; + +pub async fn get_database_connection( + authed: ApiAuthed, + user_db: Option, + db: &DB, + postgres_resource_path: &str, + w_id: &str, +) -> std::result::Result { + let database = get_database_resource(authed, user_db, db, postgres_resource_path, w_id).await?; + + Ok(get_raw_postgres_connection(&database).await?) +} + +pub async fn get_raw_postgres_connection( + db: &Database, +) -> std::result::Result { + let options = { + let sslmode = if !db.sslmode.is_empty() { + PgSslMode::from_str(&db.sslmode)? + } else { + PgSslMode::Prefer + }; + let options = PgConnectOptions::new() + .host(&db.host) + .database(&db.dbname) + .port(db.port) + .ssl_mode(sslmode) + .username(&db.user); + + let options = if !db.root_certificate_pem.is_empty() { + options.ssl_root_cert_from_pem(db.root_certificate_pem.as_bytes().to_vec()) + } else { + options + }; + + if !db.password.is_empty() { + options.password(&db.password) + } else { + options + } + }; + + Ok(PgConnection::connect_with(&options).await?) +} + +pub fn create_logical_replication_slot_query(name: &str) -> String { + let query = format!( + r#" + SELECT + * + FROM + pg_create_logical_replication_slot({}, 'pgoutput');"#, + quote_literal(&name) + ); + + query +} + +pub fn create_publication_query( + publication_name: &str, + table_to_track: Option<&[Relations]>, + transaction_to_track: &[&str], +) -> String { + let mut query = String::from("CREATE PUBLICATION "); + + query.push_str("e_identifier(publication_name)); + + match table_to_track { + Some(database_component) if !database_component.is_empty() => { + query.push_str(" FOR"); + for (i, schema) in database_component.iter().enumerate() { + if schema.table_to_track.is_empty() { + query.push_str(" TABLES IN SCHEMA "); + query.push_str("e_identifier(&schema.schema_name)); + } else { + query.push_str(" TABLE ONLY "); + for (j, table) in schema.table_to_track.iter().enumerate() { + let table_name = quote_identifier(&table.table_name); + let schema_name = quote_identifier(&schema.schema_name); + let full_name = format!("{}.{}", &schema_name, &table_name); + query.push_str(&full_name); + if !table.columns_name.is_empty() { + query.push_str(" ("); + let columns = table + .columns_name + .iter() + .map(|column| quote_identifier(column)) + .join(", "); + query.push_str(&columns); + query.push_str(")"); + } + + if let Some(where_clause) = &table.where_clause { + query.push_str(" WHERE ("); + query.push_str(where_clause); + query.push(')'); + } + + if j + 1 != schema.table_to_track.len() { + query.push_str(", "); + } + } + } + if i < database_component.len() - 1 { + query.push_str(", "); + } + } + } + _ => { + query.push_str(" FOR ALL TABLES "); + } + }; + + if !transaction_to_track.is_empty() { + let transactions = || transaction_to_track.iter().join(", "); + query.push_str(" WITH (publish = '"); + query.push_str(&transactions()); + query.push_str("');"); + } + + query +} + +pub fn drop_publication_query(publication_name: &str) -> String { + let mut query = String::from("DROP PUBLICATION IF EXISTS "); + let quoted_publication_name = quote_identifier(publication_name); + query.push_str("ed_publication_name); + query.push_str(";"); + query +} + +pub fn drop_logical_replication_slot_query(replication_slot_name: &str) -> String { + format!( + "SELECT pg_drop_replication_slot({});", + quote_literal(&replication_slot_name) + ) +} + +pub fn generate_random_string() -> String { + let timestamp = Utc::now().timestamp_millis().to_string(); + let mut rng = rand::rng(); + let charset = "abcdefghijklmnopqrstuvwxyz0123456789"; + + let random_part = (0..10) + .map(|_| { + charset + .chars() + .nth(rng.random_range(0..charset.len())) + .unwrap() + }) + .collect::(); + + format!("{}_{}", timestamp, random_part) +} + pub async fn get_database_resource( authed: ApiAuthed, user_db: Option, @@ -87,6 +256,7 @@ fn slot_service() -> Router { pub fn workspaced_service() -> Router { Router::new() + .route("/test", post(test_postgres_connection)) .route("/create", post(create_postgres_trigger)) .route("/list", get(list_postgres_triggers)) .route("/get/*path", get(get_postgres_trigger)) diff --git a/backend/windmill-api/src/postgres_triggers/trigger.rs b/backend/windmill-api/src/postgres_triggers/trigger.rs index ea0f9a5dff76b..45be9f6935ca1 100644 --- a/backend/windmill-api/src/postgres_triggers/trigger.rs +++ b/backend/windmill-api/src/postgres_triggers/trigger.rs @@ -1,7 +1,8 @@ use std::{collections::HashMap, pin::Pin}; use crate::{ - db::DB, + capture::{insert_capture_payload, PostgresTriggerConfig, TriggerKind}, + db::{ApiAuthed, DB}, postgres_triggers::{ get_database_resource, relation::RelationConverter, @@ -21,13 +22,20 @@ use pg_escape::{quote_identifier, quote_literal}; use rand::seq::SliceRandom; use rust_postgres::{config::SslMode, Client, Config, CopyBothDuplex, SimpleQueryMessage}; use rust_postgres_native_tls::MakeTlsConnector; +use serde::Deserialize; +use serde_json::value::RawValue; +use sqlx::types::Json as SqlxJson; + use windmill_common::{ - db::UserDB, utils::report_critical_error, worker::to_raw_value, INSTANCE_NAME, + db::UserDB, error, utils::report_critical_error, worker::to_raw_value, INSTANCE_NAME, }; +use windmill_queue::PushArgsOwned; use super::{ + drop_logical_replication_slot_query, drop_publication_query, get_database_connection, handler::{Database, PostgresTrigger}, replication_message::PrimaryKeepAliveBody, + ERROR_PUBLICATION_NAME_NOT_EXISTS, ERROR_REPLICATION_SLOT_NOT_EXISTS, }; pub struct LogicalReplicationSettings { @@ -106,6 +114,7 @@ impl PostgresSimpleClient { let connector = MakeTlsConnector::new(TlsConnector::new()?); let (client, connection) = config.connect(connector).await?; + tokio::spawn(async move { if let Err(e) = connection.await { tracing::debug!("{:#?}", e); @@ -116,6 +125,13 @@ impl PostgresSimpleClient { Ok(PostgresSimpleClient(client)) } + async fn execute_query( + &self, + query: &str, + ) -> Result, rust_postgres::Error> { + self.0.simple_query(query).await + } + async fn get_logical_replication_stream( &self, publication_name: &str, @@ -162,154 +178,405 @@ impl PostgresSimpleClient { } } -async fn update_ping( - db: &DB, - postgres_trigger: &PostgresTrigger, - error: Option<&str>, -) -> Option<()> { - let updated = sqlx::query_scalar!( - r#" - UPDATE - postgres_trigger - SET - last_server_ping = now(), - error = $1 - WHERE - workspace_id = $2 - AND path = $3 - AND server_id = $4 - AND enabled IS TRUE - RETURNING 1 - "#, - error, - &postgres_trigger.workspace_id, - &postgres_trigger.path, - *INSTANCE_NAME - ) - .fetch_optional(db) - .await; +async fn loop_ping(db: &DB, pg: &PostgresConfig, error: Option<&str>) { + loop { + if pg.update_ping(db, error).await.is_none() { + return; + } + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } +} + +enum PostgresConfig { + Trigger(PostgresTrigger), + Capture(CaptureConfigForPostgresTrigger), +} - match updated { - Ok(updated) => { - if updated.flatten().is_none() { - // allow faster restart of database trigger - sqlx::query!( - r#" +impl PostgresTrigger { + async fn try_to_listen_to_database_transactions( + self, + db: DB, + killpill_rx: tokio::sync::broadcast::Receiver<()>, + ) -> () { + let postgres_trigger = sqlx::query_scalar!( + r#" + UPDATE postgres_trigger + SET + server_id = $1, + last_server_ping = now(), + error = 'Connecting...' + WHERE + enabled IS TRUE + AND workspace_id = $2 + AND path = $3 + AND (last_server_ping IS NULL + OR last_server_ping < now() - INTERVAL '15 seconds' + ) + RETURNING true + "#, + *INSTANCE_NAME, + self.workspace_id, + self.path, + ) + .fetch_optional(&db) + .await; + match postgres_trigger { + Ok(has_lock) => { + if has_lock.flatten().unwrap_or(false) { + tracing::info!("Spawning new task to listen_to_database_transaction"); + tokio::spawn(async move { + listen_to_transactions( + PostgresConfig::Trigger(self), + db.clone(), + killpill_rx, + ) + .await; + }); + } else { + tracing::info!("Postgres trigger {} already being listened to", self.path); + } + } + Err(err) => { + tracing::error!( + "Error acquiring lock for postgres trigger {}: {:?}", + self.path, + err + ); + } + }; + } + + async fn update_ping(&self, db: &DB, error: Option<&str>) -> Option<()> { + let updated = sqlx::query_scalar!( + r#" + UPDATE + postgres_trigger + SET + last_server_ping = now(), + error = $1 + WHERE + workspace_id = $2 + AND path = $3 + AND server_id = $4 + AND enabled IS TRUE + RETURNING 1 + "#, + error, + &self.workspace_id, + &self.path, + *INSTANCE_NAME + ) + .fetch_optional(db) + .await; + + match updated { + Ok(updated) => { + if updated.flatten().is_none() { + // allow faster restart of database trigger + sqlx::query!( + r#" + UPDATE + postgres_trigger + SET + last_server_ping = NULL + WHERE + workspace_id = $1 + AND path = $2 + AND server_id IS NULL"#, + &self.workspace_id, + &self.path, + ) + .execute(db) + .await + .ok(); + tracing::info!( + "Postgres trigger {} changed, disabled, or deleted, stopping...", + self.path + ); + return None; + } + } + Err(err) => { + tracing::warn!( + "Error updating ping of postgres trigger {}: {:?}", + self.path, + err + ); + } + }; + + Some(()) + } + + async fn disable_with_error(&self, db: &DB, error: String) -> () { + match sqlx::query!( + r#" UPDATE postgres_trigger - SET + SET + enabled = FALSE, + error = $1, + server_id = NULL, last_server_ping = NULL WHERE - workspace_id = $1 - AND path = $2 - AND server_id IS NULL"#, - &postgres_trigger.workspace_id, - &postgres_trigger.path, + workspace_id = $2 AND + path = $3 + "#, + error, + self.workspace_id, + self.path, + ) + .execute(db) + .await + { + Ok(_) => { + report_critical_error( + format!( + "Disabling postgres trigger {} because of error: {}", + self.path, error + ), + db.clone(), + Some(&self.workspace_id), + None, ) - .execute(db) - .await - .ok(); - tracing::info!( - "Postgres trigger {} changed, disabled, or deleted, stopping...", - postgres_trigger.path - ); - return None; + .await; + } + Err(disable_err) => { + report_critical_error( + format!("Could not disable postgres trigger {} with err {}, disabling because of error {}", self.path, disable_err, error), + db.clone(), + Some(&self.workspace_id), + None, + ).await; } } - Err(err) => { - tracing::warn!( - "Error updating ping of postgres trigger {}: {:?}", - postgres_trigger.path, - err - ); - } - }; - - Some(()) -} - -async fn loop_ping(db: &DB, postgres_trigger: &PostgresTrigger, error: Option<&str>) { - loop { - if update_ping(db, postgres_trigger, error).await.is_none() { - return; - } + } - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + async fn fetch_authed(&self, db: &DB) -> error::Result { + fetch_api_authed( + self.edited_by.clone(), + self.email.clone(), + &self.workspace_id, + db, + Some(format!("pg-{}", self.path)), + ) + .await } -} -async fn disable_with_error(postgres_trigger: &PostgresTrigger, db: &DB, error: String) -> () { - match sqlx::query!( - "UPDATE postgres_trigger SET enabled = FALSE, error = $1, server_id = NULL, last_server_ping = NULL WHERE workspace_id = $2 AND path = $3", - error, - postgres_trigger.workspace_id, - postgres_trigger.path, - ) - .execute(db).await { - Ok(_) => { - report_critical_error(format!("Disabling postgres trigger {} because of error: {}", postgres_trigger.path, error), db.clone(), Some(&postgres_trigger.workspace_id), None).await; - }, - Err(disable_err) => { + async fn handle( + &self, + db: &DB, + args: Option>>, + extra: Option>>, + ) -> () { + if let Err(err) = run_job(args, extra, db, self).await { report_critical_error( - format!("Could not disable postgres trigger {} with err {}, disabling because of error {}", postgres_trigger.path, disable_err, error), + format!( + "Failed to trigger job from postgres {}: {:?}", + self.path, err + ), db.clone(), - Some(&postgres_trigger.workspace_id), + Some(&self.workspace_id), None, - ).await; - } + ) + .await; + }; } } -async fn listen_to_transactions( - postgres_trigger: &PostgresTrigger, - db: DB, - mut killpill_rx: tokio::sync::broadcast::Receiver<()>, -) { - let start_logical_replication_streaming = async { - let authed = fetch_api_authed( - postgres_trigger.edited_by.clone(), - postgres_trigger.email.clone(), - &postgres_trigger.workspace_id, - &db, - None, - ) - .await?; +struct PgInfo<'a> { + postgres_resource_path: &'a str, + publication_name: &'a str, + replication_slot_name: &'a str, + workspace_id: &'a str, +} + +impl PostgresConfig { + async fn update_ping(&self, db: &DB, error: Option<&str>) -> Option<()> { + match self { + PostgresConfig::Trigger(trigger) => trigger.update_ping(db, error).await, + PostgresConfig::Capture(capture) => capture.update_ping(db, error).await, + } + } + + async fn disable_with_error(&self, db: &DB, error: String) -> () { + match self { + PostgresConfig::Trigger(trigger) => trigger.disable_with_error(&db, error).await, + PostgresConfig::Capture(capture) => capture.disable_with_error(db, error).await, + } + } + + fn retrieve_info(&self) -> PgInfo { + let postgres_resource_path; + let publication_name; + let replication_slot_name; + let workspace_id; + + match self { + PostgresConfig::Trigger(trigger) => { + postgres_resource_path = &trigger.postgres_resource_path; + publication_name = &trigger.publication_name; + replication_slot_name = &trigger.replication_slot_name; + workspace_id = &trigger.workspace_id; + } + PostgresConfig::Capture(capture) => { + postgres_resource_path = &capture.trigger_config.postgres_resource_path; + workspace_id = &capture.workspace_id; + publication_name = capture.trigger_config.publication_name.as_ref().unwrap(); + replication_slot_name = capture + .trigger_config + .replication_slot_name + .as_ref() + .unwrap(); + } + }; + + PgInfo { postgres_resource_path, replication_slot_name, workspace_id, publication_name } + } + + async fn start_logical_replication_streaming( + &self, + db: &DB, + ) -> std::result::Result<(CopyBothDuplex, LogicalReplicationSettings), Error> { + let PgInfo { + publication_name, + replication_slot_name, + workspace_id, + postgres_resource_path, + } = self.retrieve_info(); + + let authed = match self { + PostgresConfig::Trigger(trigger) => trigger.fetch_authed(db).await?, + PostgresConfig::Capture(capture) => capture.fetch_authed(db).await?, + }; let database = get_database_resource( authed, Some(UserDB::new(db.clone())), &db, - &postgres_trigger.postgres_resource_path, - &postgres_trigger.workspace_id, + postgres_resource_path, + workspace_id, ) .await?; let client = PostgresSimpleClient::new(&database).await?; + let publication = client + .execute_query(&format!( + "SELECT pubname FROM pg_publication WHERE pubname = {}", + quote_literal(&publication_name) + )) + .await?; + + if !publication.row_exist() { + return Err(Error::Common(error::Error::BadConfig( + ERROR_PUBLICATION_NAME_NOT_EXISTS.to_string(), + ))); + } + + let replication_slot = client + .execute_query(&format!( + "SELECT slot_name FROM pg_replication_slots WHERE slot_name = {}", + quote_literal(&replication_slot_name) + )) + .await?; + + if !replication_slot.row_exist() { + return Err(Error::Common(error::Error::BadConfig( + ERROR_REPLICATION_SLOT_NOT_EXISTS.to_string(), + ))); + } + let (logical_replication_stream, logical_replication_settings) = client - .get_logical_replication_stream( - &postgres_trigger.publication_name, - &postgres_trigger.replication_slot_name, - ) + .get_logical_replication_stream(&publication_name, &replication_slot_name) .await?; - Ok::<_, Error>((logical_replication_stream, logical_replication_settings)) - }; + Ok((logical_replication_stream, logical_replication_settings)) + } + + fn get_path(&self) -> &str { + match self { + PostgresConfig::Trigger(trigger) => &trigger.path, + PostgresConfig::Capture(capture) => &capture.path, + } + } + + async fn handle( + &self, + db: &DB, + args: Option>>, + extra: Option>>, + ) -> () { + match self { + PostgresConfig::Trigger(trigger) => trigger.handle(&db, args, extra).await, + PostgresConfig::Capture(capture) => capture.handle(&db, args, extra).await, + } + } + + async fn cleanup(&self, db: &DB) -> Result<(), Error> { + match self { + PostgresConfig::Trigger(_) => Ok(()), + PostgresConfig::Capture(capture) => { + let publication_name = capture.trigger_config.publication_name.as_ref().unwrap(); + let replication_slot_name = capture + .trigger_config + .replication_slot_name + .as_ref() + .unwrap(); + let postgres_resource_path = &capture.trigger_config.postgres_resource_path; + let workspace_id = &capture.workspace_id; + let authed = capture.fetch_authed(&db).await?; + + let user_db = UserDB::new(db.clone()); + + let mut connection = get_database_connection( + authed.clone(), + Some(user_db.clone()), + &db, + postgres_resource_path, + workspace_id, + ) + .await?; + + let query = drop_logical_replication_slot_query(replication_slot_name); + + let _ = sqlx::query(&query).execute(&mut connection).await; + + let query = drop_publication_query(publication_name); + + let _ = sqlx::query(&query).execute(&mut connection).await; + + Ok(()) + } + } + } +} + +async fn listen_to_transactions( + pg: PostgresConfig, + db: DB, + mut killpill_rx: tokio::sync::broadcast::Receiver<()>, +) { tokio::select! { biased; _ = killpill_rx.recv() => { + let _ = pg.cleanup(&db).await; return; } - _ = loop_ping(&db, postgres_trigger, Some("Connecting...")) => { + _ = loop_ping(&db, &pg, Some("Connecting...")) => { + let _ = pg.cleanup(&db).await; return; } - result = start_logical_replication_streaming => { + result = pg.start_logical_replication_streaming(&db) => { tokio::select! { biased; _ = killpill_rx.recv() => { + let _ = pg.cleanup(&db).await; return; } - _ = loop_ping(&db, postgres_trigger, None) => { + _ = loop_ping(&db, &pg, None) => { + let _ = pg.cleanup(&db).await; return; } _ = { @@ -318,15 +585,15 @@ async fn listen_to_transactions( Ok((logical_replication_stream, logical_replication_settings)) => { pin_mut!(logical_replication_stream); let mut relations = RelationConverter::new(); - tracing::info!("Starting to listen for postgres trigger {}", postgres_trigger.path); + tracing::info!("Starting to listen for postgres trigger {}", pg.get_path()); loop { let message = logical_replication_stream.next().await; let message = match message { Some(message) => message, None => { - tracing::error!("Stream for postgres trigger {} closed", postgres_trigger.path); - if let None = update_ping(&db, postgres_trigger, Some("Stream closed")).await { + tracing::error!("Stream for postgres trigger {} closed", pg.get_path()); + if let None = pg.update_ping(&db, Some("Stream closed")).await { return; } return; @@ -336,8 +603,8 @@ async fn listen_to_transactions( let message = match message { Ok(message) => message, Err(err) => { - let err = format!("Postgres trigger named {} had an error while receiving a message : {}", &postgres_trigger.path, err.to_string()); - disable_with_error(&postgres_trigger, &db, err).await; + let err = format!("Postgres trigger named {} had an error while receiving a message : {}", pg.get_path(), err.to_string()); + pg.disable_with_error(&db, err).await; return; } }; @@ -345,8 +612,8 @@ async fn listen_to_transactions( let logical_message = match ReplicationMessage::parse(message) { Ok(logical_message) => logical_message, Err(err) => { - let err = format!("Postgres trigger named: {} had an error while parsing message: {}", postgres_trigger.path, err.to_string()); - disable_with_error(&postgres_trigger, &db, err).await; + let err = format!("Postgres trigger named: {} had an error while parsing message: {}", pg.get_path(), err.to_string()); + pg.disable_with_error(&db, err).await; return; } }; @@ -362,7 +629,7 @@ async fn listen_to_transactions( let logical_replication_message = match x_log_data.parse(&logical_replication_settings) { Ok(logical_replication_message) => logical_replication_message, Err(err) => { - tracing::error!("Postgres trigger named: {} had an error while trying to parse incomming stream message: {}", &postgres_trigger.path, err.to_string()); + tracing::error!("Postgres trigger named: {} had an error while trying to parse incomming stream message: {}", pg.get_path(), err.to_string()); continue; } }; @@ -390,7 +657,7 @@ async fn listen_to_transactions( let relation = match relations.get_relation(o_id) { Ok(relation) => relation, Err(err) => { - tracing::error!("Postgres trigger named: {}, error: {}", &postgres_trigger.path, err.to_string()); + tracing::error!("Postgres trigger named: {}, error: {}", pg.get_path(), err.to_string()); continue; } }; @@ -404,7 +671,9 @@ async fn listen_to_transactions( "wm_trigger".to_string(), to_raw_value(&serde_json::json!({"kind": "postgres", })), )])); - let _ = run_job(Some(database_info), extra, &db, postgres_trigger).await; + + + let _ = pg.handle(&db, Some(database_info), extra).await; } } @@ -413,11 +682,12 @@ async fn listen_to_transactions( } Err(err) => { tracing::error!("Postgres trigger error while trying to start logical replication streaming: {}", &err); - disable_with_error(&postgres_trigger, &db, err.to_string()).await + pg.disable_with_error(&db, err.to_string()).await } } } } => { + let _ = pg.cleanup(&db).await; return; } } @@ -425,55 +695,204 @@ async fn listen_to_transactions( } } -async fn try_to_listen_to_database_transactions( - pg_trigger: PostgresTrigger, - db: DB, - killpill_rx: tokio::sync::broadcast::Receiver<()>, -) { - let postgres_trigger = sqlx::query_scalar!( - r#" - UPDATE postgres_trigger - SET - server_id = $1, - last_server_ping = now(), - error = 'Connecting...' - WHERE - enabled IS TRUE - AND workspace_id = $2 - AND path = $3 - AND (last_server_ping IS NULL - OR last_server_ping < now() - INTERVAL '15 seconds' - ) - RETURNING true +#[derive(Deserialize)] +struct CaptureConfigForPostgresTrigger { + trigger_config: SqlxJson, + path: String, + is_flow: bool, + workspace_id: String, + owner: String, + email: String, +} + +impl CaptureConfigForPostgresTrigger { + async fn try_to_listen_to_database_transactions( + self, + db: DB, + killpill_rx: tokio::sync::broadcast::Receiver<()>, + ) -> () { + match sqlx::query_scalar!( + r#" + UPDATE + capture_config + SET + server_id = $1, + last_server_ping = now(), + error = 'Connecting...' + WHERE + last_client_ping > NOW() - INTERVAL '10 seconds' AND + workspace_id = $2 AND + path = $3 AND + is_flow = $4 AND + trigger_kind = 'postgres' AND + (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') + RETURNING true + "#, + *INSTANCE_NAME, + self.workspace_id, + self.path, + self.is_flow, + ) + .fetch_optional(&db) + .await + { + Ok(has_lock) => { + if has_lock.flatten().unwrap_or(false) { + tokio::spawn(listen_to_transactions( + PostgresConfig::Capture(self), + db, + killpill_rx, + )); + } else { + tracing::info!("Postgres {} already being listened to", self.path); + } + } + Err(err) => { + tracing::error!( + "Error acquiring lock for capture postgres {}: {:?}", + self.path, + err + ); + } + }; + } + + async fn update_ping(&self, db: &DB, error: Option<&str>) -> Option<()> { + match sqlx::query_scalar!( + r#" + UPDATE + capture_config + SET + last_server_ping = now(), + error = $1 + WHERE + workspace_id = $2 AND + path = $3 AND + is_flow = $4 AND + trigger_kind = 'postgres' AND + server_id = $5 AND + last_client_ping > NOW() - INTERVAL '10 seconds' + RETURNING 1 "#, - *INSTANCE_NAME, - pg_trigger.workspace_id, - pg_trigger.path, - ) - .fetch_optional(&db) - .await; - match postgres_trigger { - Ok(has_lock) => { - if has_lock.flatten().unwrap_or(false) { - tracing::info!("Spawning new task to listen_to_database_transaction"); - tokio::spawn(async move { - listen_to_transactions(&pg_trigger, db.clone(), killpill_rx).await; - }); - } else { - tracing::info!( - "Postgres trigger {} already being listened to", - pg_trigger.path + error, + self.workspace_id, + self.path, + self.is_flow, + *INSTANCE_NAME + ) + .fetch_optional(db) + .await + { + Ok(updated) => { + if updated.flatten().is_none() { + // allow faster restart of postgres capture + sqlx::query!( + r#"UPDATE + capture_config + SET + last_server_ping = NULL + WHERE + workspace_id = $1 AND + path = $2 AND + is_flow = $3 AND + trigger_kind = 'postgres' AND + server_id IS NULL + "#, + self.workspace_id, + self.path, + self.is_flow, + ) + .execute(db) + .await + .ok(); + tracing::info!( + "Postgres capture {} changed, disabled, or deleted, stopping...", + self.path + ); + return None; + } + } + Err(err) => { + tracing::warn!( + "Error updating ping of capture postgres {}: {:?}", + self.path, + err ); } + }; + + Some(()) + } + + async fn fetch_authed(&self, db: &DB) -> error::Result { + fetch_api_authed( + self.owner.clone(), + self.email.clone(), + &self.workspace_id, + db, + Some(format!("postgres-{}", self.get_trigger_path())), + ) + .await + } + + fn get_trigger_path(&self) -> String { + format!( + "{}-{}", + if self.is_flow { "flow" } else { "script" }, + self.path + ) + } + + async fn disable_with_error(&self, db: &DB, error: String) -> () { + if let Err(err) = sqlx::query!( + r#" + UPDATE + capture_config + SET + error = $1, + server_id = NULL, + last_server_ping = NULL + WHERE + workspace_id = $2 AND + path = $3 AND + is_flow = $4 AND + trigger_kind = 'postgres' + "#, + error, + self.workspace_id, + self.path, + self.is_flow, + ) + .execute(db) + .await + { + tracing::error!("Could not disable postgres capture {} ({}) with err {}, disabling because of error {}", self.path, self.workspace_id, err, error); } - Err(err) => { - tracing::error!( - "Error acquiring lock for postgres trigger {}: {:?}", - pg_trigger.path, - err - ); + } + + async fn handle( + &self, + db: &DB, + args: Option>>, + extra: Option>>, + ) -> () { + let args = PushArgsOwned { args: args.unwrap_or_default(), extra }; + let extra = args.extra.as_ref().map(to_raw_value); + if let Err(err) = insert_capture_payload( + db, + &self.workspace_id, + &self.path, + self.is_flow, + &TriggerKind::Postgres, + args, + extra, + &self.owner, + ) + .await + { + tracing::error!("Error inserting capture payload: {:?}", err); } - }; + } } async fn listen_to_unlistened_database_events( @@ -515,18 +934,51 @@ async fn listen_to_unlistened_database_events( Ok(mut triggers) => { triggers.shuffle(&mut rand::rng()); for trigger in triggers { - try_to_listen_to_database_transactions( - trigger, - db.clone(), - killpill_rx.resubscribe(), - ) - .await; + trigger + .try_to_listen_to_database_transactions(db.clone(), killpill_rx.resubscribe()) + .await; } } Err(err) => { tracing::error!("Error fetching postgres triggers: {:?}", err); } }; + + let postgres_triggers_capture = sqlx::query_as!( + CaptureConfigForPostgresTrigger, + r#" + SELECT + path, + is_flow, + workspace_id, + owner, + email, + trigger_config as "trigger_config!: _" + FROM + capture_config + WHERE + trigger_kind = 'postgres' AND + last_client_ping > NOW() - INTERVAL '10 seconds' AND + trigger_config IS NOT NULL AND + (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') + "# + ) + .fetch_all(db) + .await; + + match postgres_triggers_capture { + Ok(mut captures) => { + captures.shuffle(&mut rand::rng()); + for capture in captures { + capture + .try_to_listen_to_database_transactions(db.clone(), killpill_rx.resubscribe()) + .await; + } + } + Err(err) => { + tracing::error!("Error fetching captures postgres triggers: {:?}", err); + } + }; } pub fn start_database(db: DB, mut killpill_rx: tokio::sync::broadcast::Receiver<()>) { diff --git a/backend/windmill-api/src/workspaces_export.rs b/backend/windmill-api/src/workspaces_export.rs index 40a80025ea976..c9f15306b3937 100644 --- a/backend/windmill-api/src/workspaces_export.rs +++ b/backend/windmill-api/src/workspaces_export.rs @@ -336,8 +336,8 @@ pub(crate) async fn tarball_workspace( { let scripts = sqlx::query_as::<_, Script>( "SELECT * FROM script as o WHERE workspace_id = $1 AND archived = false - AND created_at = (select max(created_at) from script where path = o.path AND \ - workspace_id = $1)", + AND created_at = (select max(created_at) from script where path = o.path AND \ + workspace_id = $1)", ) .bind(&w_id) .fetch_all(&mut *tx) @@ -413,12 +413,12 @@ pub(crate) async fn tarball_workspace( if !skip_resources.unwrap_or(false) { let resources = sqlx::query_as!( - Resource, - "SELECT * FROM resource WHERE workspace_id = $1 AND resource_type != 'state' AND resource_type != 'cache'", - &w_id - ) - .fetch_all(&mut *tx) - .await?; + Resource, + "SELECT * FROM resource WHERE workspace_id = $1 AND resource_type != 'state' AND resource_type != 'cache'", + &w_id + ) + .fetch_all(&mut *tx) + .await?; for resource in resources { let resource_str = &to_string_without_metadata(&resource, false, None).unwrap(); @@ -450,14 +450,14 @@ pub(crate) async fn tarball_workspace( { let flows = sqlx::query_as::<_, Flow>( - "SELECT flow.workspace_id, flow.path, flow.summary, flow.description, flow.archived, flow.extra_perms, flow.draft_only, flow.dedicated_worker, flow.tag, flow.ws_error_handler_muted, flow.timeout, flow.visible_to_runner_only, flow.on_behalf_of_email, flow_version.schema, flow_version.value, flow_version.created_at as edited_at, flow_version.created_by as edited_by - FROM flow - LEFT JOIN flow_version ON flow_version.id = flow.versions[array_upper(flow.versions, 1)] - WHERE flow.workspace_id = $1 AND flow.archived = false", - ) - .bind(&w_id) - .fetch_all(&mut *tx) - .await?; + "SELECT flow.workspace_id, flow.path, flow.summary, flow.description, flow.archived, flow.extra_perms, flow.draft_only, flow.dedicated_worker, flow.tag, flow.ws_error_handler_muted, flow.timeout, flow.visible_to_runner_only, flow.on_behalf_of_email, flow_version.schema, flow_version.value, flow_version.created_at as edited_at, flow_version.created_by as edited_by + FROM flow + LEFT JOIN flow_version ON flow_version.id = flow.versions[array_upper(flow.versions, 1)] + WHERE flow.workspace_id = $1 AND flow.archived = false", + ) + .bind(&w_id) + .fetch_all(&mut *tx) + .await?; for flow in flows { let flow_str = &to_string_without_metadata(&flow, false, None).unwrap(); @@ -469,14 +469,14 @@ pub(crate) async fn tarball_workspace( if !skip_variables.unwrap_or(false) { let variables = - sqlx::query_as::<_, ExportableListableVariable>(if !skip_secrets.unwrap_or(false) { - "SELECT * FROM variable WHERE workspace_id = $1 AND expires_at IS NULL" - } else { - "SELECT * FROM variable WHERE workspace_id = $1 AND is_secret = false AND expires_at IS NULL" - }) - .bind(&w_id) - .fetch_all(&mut *tx) - .await?; + sqlx::query_as::<_, ExportableListableVariable>(if !skip_secrets.unwrap_or(false) { + "SELECT * FROM variable WHERE workspace_id = $1 AND expires_at IS NULL" + } else { + "SELECT * FROM variable WHERE workspace_id = $1 AND is_secret = false AND expires_at IS NULL" + }) + .bind(&w_id) + .fetch_all(&mut *tx) + .await?; let mc = build_crypt(&db, &w_id).await?; @@ -496,14 +496,14 @@ pub(crate) async fn tarball_workspace( { let apps = sqlx::query_as::<_, AppWithLastVersion>( - "SELECT app.id, app.path, app.summary, app.versions, app.policy, app.custom_path, - app.extra_perms, app_version.value, - app_version.created_at, app_version.created_by from app, app_version - WHERE app.workspace_id = $1 AND app_version.id = app.versions[array_upper(app.versions, 1)]", - ) - .bind(&w_id) - .fetch_all(&mut *tx) - .await?; + "SELECT app.id, app.path, app.summary, app.versions, app.policy, app.custom_path, + app.extra_perms, app_version.value, + app_version.created_at, app_version.created_by from app, app_version + WHERE app.workspace_id = $1 AND app_version.id = app.versions[array_upper(app.versions, 1)]", + ) + .bind(&w_id) + .fetch_all(&mut *tx) + .await?; for app in apps { let app_str = &to_string_without_metadata(&app, false, None).unwrap(); @@ -516,7 +516,7 @@ pub(crate) async fn tarball_workspace( if include_schedules.unwrap_or(false) { let schedules = sqlx::query_as::<_, Schedule>( "SELECT * FROM schedule - WHERE workspace_id = $1", + WHERE workspace_id = $1", ) .bind(&w_id) .fetch_all(&mut *tx) @@ -534,13 +534,13 @@ pub(crate) async fn tarball_workspace( #[cfg(feature = "http_trigger")] { let http_triggers = sqlx::query_as!( - crate::http_triggers::HttpTrigger, - "SELECT workspace_id, path, route_path, route_path_key, script_path, is_flow, edited_by, edited_at, email, extra_perms, is_async, requires_auth, http_method as \"http_method: _\", static_asset_config as \"static_asset_config: _\", is_static_website FROM http_trigger - WHERE workspace_id = $1", - &w_id - ) - .fetch_all(&mut *tx) - .await?; + crate::http_triggers::HttpTrigger, + "SELECT workspace_id, path, route_path, route_path_key, script_path, is_flow, edited_by, edited_at, email, extra_perms, is_async, requires_auth, http_method as \"http_method: _\", static_asset_config as \"static_asset_config: _\", is_static_website FROM http_trigger + WHERE workspace_id = $1", + &w_id + ) + .fetch_all(&mut *tx) + .await?; for trigger in http_triggers { let trigger_str = &to_string_without_metadata(&trigger, false, None).unwrap(); @@ -553,13 +553,13 @@ pub(crate) async fn tarball_workspace( #[cfg(feature = "websocket")] { let websocket_triggers = sqlx::query_as!( - crate::websocket_triggers::WebsocketTrigger, - "SELECT workspace_id, path, url, script_path, is_flow, edited_by, email, edited_at, server_id, last_server_ping, extra_perms, error, enabled, filters as \"filters: _\", initial_messages as \"initial_messages: _\", url_runnable_args as \"url_runnable_args: _\", can_return_message FROM websocket_trigger - WHERE workspace_id = $1", - &w_id - ) - .fetch_all(&mut *tx) - .await?; + crate::websocket_triggers::WebsocketTrigger, + "SELECT workspace_id, path, url, script_path, is_flow, edited_by, email, edited_at, server_id, last_server_ping, extra_perms, error, enabled, filters as \"filters: _\", initial_messages as \"initial_messages: _\", url_runnable_args as \"url_runnable_args: _\", can_return_message FROM websocket_trigger + WHERE workspace_id = $1", + &w_id + ) + .fetch_all(&mut *tx) + .await?; for trigger in websocket_triggers { let trigger_str = &to_string_without_metadata(&trigger, false, None).unwrap(); @@ -577,7 +577,7 @@ pub(crate) async fn tarball_workspace( let kafka_triggers = sqlx::query_as!( crate::kafka_triggers_ee::KafkaTrigger, "SELECT * FROM kafka_trigger - WHERE workspace_id = $1", + WHERE workspace_id = $1", &w_id ) .fetch_all(&mut *tx) @@ -599,7 +599,7 @@ pub(crate) async fn tarball_workspace( let nats_triggers = sqlx::query_as!( crate::nats_triggers_ee::NatsTrigger, "SELECT * FROM nats_trigger - WHERE workspace_id = $1", + WHERE workspace_id = $1", &w_id ) .fetch_all(&mut *tx) @@ -619,7 +619,7 @@ pub(crate) async fn tarball_workspace( let postgres_triggers = sqlx::query_as!( crate::postgres_triggers::PostgresTrigger, "SELECT * FROM postgres_trigger - WHERE workspace_id = $1", + WHERE workspace_id = $1", &w_id ) .fetch_all(&mut *tx) @@ -640,7 +640,7 @@ pub(crate) async fn tarball_workspace( if include_users.unwrap_or(false) { let users = sqlx::query!( "SELECT * FROM usr - WHERE workspace_id = $1", + WHERE workspace_id = $1", &w_id ) .fetch_all(&mut *tx) @@ -668,16 +668,16 @@ pub(crate) async fn tarball_workspace( if include_groups.unwrap_or(false) { let groups = sqlx::query!( - r#"SELECT g_.workspace_id, name, summary, extra_perms, array_agg(u2g.usr) filter (where u2g.usr is not null) as members - FROM usr u - JOIN usr_to_group u2g ON u2g.usr = u.username AND u2g.workspace_id = u.workspace_id - RIGHT JOIN group_ g_ ON g_.workspace_id = u.workspace_id AND g_.name = u2g.group_ - WHERE g_.workspace_id = $1 AND g_.name != 'all' - GROUP BY g_.workspace_id, name, summary, extra_perms"#, - &w_id - ) - .fetch_all(&mut *tx) - .await?; + r#"SELECT g_.workspace_id, name, summary, extra_perms, array_agg(u2g.usr) filter (where u2g.usr is not null) as members + FROM usr u + JOIN usr_to_group u2g ON u2g.usr = u.username AND u2g.workspace_id = u.workspace_id + RIGHT JOIN group_ g_ ON g_.workspace_id = u.workspace_id AND g_.name = u2g.group_ + WHERE g_.workspace_id = $1 AND g_.name != 'all' + GROUP BY g_.workspace_id, name, summary, extra_perms"#, + &w_id + ) + .fetch_all(&mut *tx) + .await?; for group in groups { let extra_perms: HashMap = serde_json::from_value(group.extra_perms) @@ -728,36 +728,36 @@ pub(crate) async fn tarball_workspace( if include_settings.unwrap_or(false) { let settings = sqlx::query_as!( - SimplifiedSettings, - r#"SELECT - -- slack_team_id, - -- slack_name, - -- slack_command_script, - -- CASE WHEN slack_email = 'missing@email.xyz' THEN NULL ELSE slack_email END AS slack_email, - auto_invite_domain IS NOT NULL AS "auto_invite_enabled!", - CASE WHEN auto_invite_operator IS TRUE THEN 'operator' ELSE 'developer' END AS "auto_invite_as!", - CASE WHEN auto_add IS TRUE THEN 'add' ELSE 'invite' END AS "auto_invite_mode!", - webhook, - deploy_to, - error_handler, - ai_resource, - ai_models, - code_completion_model, - error_handler_extra_args, - error_handler_muted_on_cancel, - large_file_storage, - git_sync, - default_app, - default_scripts, - workspace.name, - mute_critical_alerts, - color, - operator_settings - FROM workspace_settings - LEFT JOIN workspace ON workspace.id = workspace_settings.workspace_id - WHERE workspace_id = $1"#, - &w_id - ).fetch_one(&mut *tx).await?; + SimplifiedSettings, + r#"SELECT + -- slack_team_id, + -- slack_name, + -- slack_command_script, + -- CASE WHEN slack_email = 'missing@email.xyz' THEN NULL ELSE slack_email END AS slack_email, + auto_invite_domain IS NOT NULL AS "auto_invite_enabled!", + CASE WHEN auto_invite_operator IS TRUE THEN 'operator' ELSE 'developer' END AS "auto_invite_as!", + CASE WHEN auto_add IS TRUE THEN 'add' ELSE 'invite' END AS "auto_invite_mode!", + webhook, + deploy_to, + error_handler, + ai_resource, + ai_models, + code_completion_model, + error_handler_extra_args, + error_handler_muted_on_cancel, + large_file_storage, + git_sync, + default_app, + default_scripts, + workspace.name, + mute_critical_alerts, + color, + operator_settings + FROM workspace_settings + LEFT JOIN workspace ON workspace.id = workspace_settings.workspace_id + WHERE workspace_id = $1"#, + &w_id + ).fetch_one(&mut *tx).await?; let settings_str = serde_json::to_value(settings) .map(|v| serde_json::to_string_pretty(&v).ok()) diff --git a/frontend/src/lib/components/Label.svelte b/frontend/src/lib/components/Label.svelte index dec8f69aae636..9f60433442a07 100644 --- a/frontend/src/lib/components/Label.svelte +++ b/frontend/src/lib/components/Label.svelte @@ -13,10 +13,12 @@
{#if !headless}
- {label} - {#if required} - - {/if} + {label} + {#if required} + + {/if} +
{/if} diff --git a/frontend/src/lib/components/Section.svelte b/frontend/src/lib/components/Section.svelte index d4babb09c1d0d..ef98239897c73 100644 --- a/frontend/src/lib/components/Section.svelte +++ b/frontend/src/lib/components/Section.svelte @@ -46,6 +46,8 @@ {#if tooltip} {tooltip} + {:else if $$slots.tooltip} + {/if} {#if eeOnly} {#if !$enterpriseLicense} diff --git a/frontend/src/lib/components/ShareModal.svelte b/frontend/src/lib/components/ShareModal.svelte index 1724003f93947..fe6dcda387047 100644 --- a/frontend/src/lib/components/ShareModal.svelte +++ b/frontend/src/lib/components/ShareModal.svelte @@ -28,6 +28,7 @@ | 'websocket_trigger' | 'kafka_trigger' | 'nats_trigger' + | 'postgres_trigger' let kind: Kind let path: string = '' diff --git a/frontend/src/lib/components/details/DetailPageTriggerPanel.svelte b/frontend/src/lib/components/details/DetailPageTriggerPanel.svelte index e08f338c6ee90..3bf60b99ab46c 100644 --- a/frontend/src/lib/components/details/DetailPageTriggerPanel.svelte +++ b/frontend/src/lib/components/details/DetailPageTriggerPanel.svelte @@ -7,7 +7,10 @@ Terminal, Webhook, Unplug, - PlugZap + PlugZap, + + Database + } from 'lucide-svelte' import HighlightTheme from '../HighlightTheme.svelte' @@ -69,7 +72,7 @@ - + Postgres diff --git a/frontend/src/lib/components/schema/AddPropertyFormV2.svelte b/frontend/src/lib/components/schema/AddPropertyFormV2.svelte index c71fdcf6bf029..8748fc8e3eef7 100644 --- a/frontend/src/lib/components/schema/AddPropertyFormV2.svelte +++ b/frontend/src/lib/components/schema/AddPropertyFormV2.svelte @@ -5,6 +5,7 @@ import Popover from '$lib/components/meltComponents/Popover.svelte' let name: string = '' + export let customName: string | undefined = undefined const dispatch = createEventDispatcher() @@ -24,7 +25,7 @@
{ if (event.key === 'Enter') { addField() @@ -44,7 +45,7 @@ disabled={!name} shortCut={{ Icon: CornerDownLeft, withoutModifier: true }} > - Add field + Add {customName ? customName.toLowerCase() : 'field'}
diff --git a/frontend/src/lib/components/triggers.ts b/frontend/src/lib/components/triggers.ts index 03530d0e98a3c..ac4a905da59cc 100644 --- a/frontend/src/lib/components/triggers.ts +++ b/frontend/src/lib/components/triggers.ts @@ -65,6 +65,8 @@ export function captureTriggerKindToTriggerKind(kind: CaptureTriggerKind): Trigg return 'kafka' case 'nats': return 'nats' + case 'postgres': + return 'postgres' default: throw new Error(`Unknown CaptureTriggerKind: ${kind}`) } diff --git a/frontend/src/lib/components/triggers/CaptureButton.svelte b/frontend/src/lib/components/triggers/CaptureButton.svelte index 045f72ea378ac..b90070d2c13b0 100644 --- a/frontend/src/lib/components/triggers/CaptureButton.svelte +++ b/frontend/src/lib/components/triggers/CaptureButton.svelte @@ -1,7 +1,7 @@ -
- -
+{#if !noButton} +
+ +
+{/if} diff --git a/frontend/src/lib/components/triggers/TriggersEditor.svelte b/frontend/src/lib/components/triggers/TriggersEditor.svelte index 29d739036b958..68bf72a88b1f3 100644 --- a/frontend/src/lib/components/triggers/TriggersEditor.svelte +++ b/frontend/src/lib/components/triggers/TriggersEditor.svelte @@ -140,7 +140,18 @@
{:else if $selectedTrigger === 'postgres'}
- +
{:else if $selectedTrigger === 'kafka' || $selectedTrigger === 'nats'}
diff --git a/frontend/src/lib/components/triggers/TriggersEditorSection.svelte b/frontend/src/lib/components/triggers/TriggersEditorSection.svelte index f684bf69a0b8f..8c01f3468ea4b 100644 --- a/frontend/src/lib/components/triggers/TriggersEditorSection.svelte +++ b/frontend/src/lib/components/triggers/TriggersEditorSection.svelte @@ -31,7 +31,8 @@ webhook: 'Webhook', kafka: '+ New Kafka trigger', email: 'Email trigger', - nats: '+ New NATS trigger' + nats: '+ New NATS trigger', + postgres: '+ New Postgres trigger' } const { captureOn } = getContext('TriggerContext') diff --git a/frontend/src/lib/components/triggers/TriggersWrapper.svelte b/frontend/src/lib/components/triggers/TriggersWrapper.svelte index 90db46c6d9373..150414795a12a 100644 --- a/frontend/src/lib/components/triggers/TriggersWrapper.svelte +++ b/frontend/src/lib/components/triggers/TriggersWrapper.svelte @@ -8,6 +8,7 @@ import EmailTriggerConfigSection from '../details/EmailTriggerConfigSection.svelte' import KafkaTriggersConfigSection from './kafka/KafkaTriggersConfigSection.svelte' import NatsTriggersConfigSection from './nats/NatsTriggersConfigSection.svelte' + import PostgresEditorConfigSection from './postgres/PostgresEditorConfigSection.svelte' export let triggerType: CaptureTriggerKind = 'webhook' export let cloudDisabled: boolean = false @@ -30,6 +31,14 @@ bind:url_runnable_args={args.url_runnable_args} showCapture={false} /> + {:else if triggerType === 'postgres'} + {:else if triggerType === 'webhook'} + import { Button } from '$lib/components/common' + import Tooltip from '$lib/components/Tooltip.svelte' + import { PostgresTriggerService } from '$lib/gen' + import { workspaceStore } from '$lib/stores' + import { sendUserToast } from '$lib/toast' + import { emptyString } from '$lib/utils' + + let loadingConfiguration = false + + const checkDatabaseConfiguration = async () => { + if (emptyString(postgres_resource_path)) { + sendUserToast('You must first pick a database resource', true) + return + } + try { + const invalidConfig = !(await PostgresTriggerService.isValidPostgresConfiguration({ + workspace: $workspaceStore!, + path: postgres_resource_path + })) + + let msg = 'Database is in logical mode. Triggers can be used.' + + if (invalidConfig) { + msg = + 'Database is NOT in logical mode. Triggers cannot be used. Refer to the PostgreSQL documentation for configuration requirements.' + } + + sendUserToast(msg, invalidConfig) + } catch (error) { + sendUserToast(error.body, true) + } + + loadingConfiguration = false + } + + const checkConnectionAndDatabaseConfiguration = async () => { + try { + loadingConfiguration = true + if (checkConnection) { + await checkConnection() + } + await checkDatabaseConfiguration() + } catch (error) { + sendUserToast(error.body, true) + } + loadingConfiguration = false + } + + export let can_write: boolean + export let postgres_resource_path: string + export let checkConnection: any | undefined = undefined + + console.log('dbg check connection', checkConnection) + + +{#if postgres_resource_path} +
+ +
+{/if} diff --git a/frontend/src/lib/components/triggers/postgres/PostgresEditorConfigSection.svelte b/frontend/src/lib/components/triggers/postgres/PostgresEditorConfigSection.svelte new file mode 100644 index 0000000000000..d6a63405846d3 --- /dev/null +++ b/frontend/src/lib/components/triggers/postgres/PostgresEditorConfigSection.svelte @@ -0,0 +1,158 @@ + + +
+ {#if showCapture && captureInfo} + + {/if} +
+
+
+

+ Pick a database to connect to +

+ { + if (emptyString(postgres_resource_path)) { + selectedTable = 'specific' + publication = { ...DEFAULT_PUBLICATION } + } + }} + /> + {#if postgres_resource_path} + + + {/if} +
+ {#if postgres_resource_path} + + + {/if} +
+
+
diff --git a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditor.svelte b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditor.svelte index d5f410e594588..9b46ac2b0baa8 100644 --- a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditor.svelte +++ b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditor.svelte @@ -9,10 +9,10 @@ drawer?.openEdit(ePath, isFlow) } - export async function openNew(is_flow: boolean, initial_script_path?: string) { + export async function openNew(is_flow: boolean, initial_script_path?: string, defaultValues?: Record) { open = true await tick() - drawer?.openNew(is_flow, initial_script_path) + drawer?.openNew(is_flow, initial_script_path, defaultValues) } let drawer: PostgresTriggerEditorInner diff --git a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte index 14310d1e9d5c2..71919774c9a52 100644 --- a/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte +++ b/frontend/src/lib/components/triggers/postgres/PostgresTriggerEditorInner.svelte @@ -10,7 +10,7 @@ import { canWrite, emptyString, emptyStringTrimmed, sendUserToast } from '$lib/utils' import { createEventDispatcher } from 'svelte' import Section from '$lib/components/Section.svelte' - import { Loader2, Save } from 'lucide-svelte' + import { Loader2, Save, X } from 'lucide-svelte' import Label from '$lib/components/Label.svelte' import Toggle from '$lib/components/Toggle.svelte' import ResourcePicker from '$lib/components/ResourcePicker.svelte' @@ -25,6 +25,8 @@ import Tabs from '$lib/components/common/tabs/Tabs.svelte' import Tab from '$lib/components/common/tabs/Tab.svelte' import RelationPicker from './RelationPicker.svelte' + import { invalidRelations } from './utils' + import CheckPostgresRequirement from './CheckPostgresRequirement.svelte' let drawer: Drawer let is_flow: boolean = false @@ -53,13 +55,7 @@ let publicationItems: string[] = [] let transactionType: string[] = ['Insert', 'Update', 'Delete'] let selectedTable: 'all' | 'specific' = 'specific' - let tab: 'advanced' | 'basic' - let config: { isLogical: boolean; show: boolean } = { isLogical: false, show: false } - let loadingConfiguration = false - $: table_to_track = selectedTable === 'all' ? [] : relations - $: if (postgres_resource_path === undefined) { - config.show = false - } + let tab: 'advanced' | 'basic' = 'basic' async function createPublication() { try { const message = await PostgresTriggerService.createPostgresPublication({ @@ -68,7 +64,7 @@ workspace: $workspaceStore!, requestBody: { transaction_to_track: transaction_to_track, - table_to_track + table_to_track: relations } }) @@ -107,7 +103,6 @@ dirtyPath = false selectedPublicationAction = 'get' selectedSlotAction = 'get' - config.show = false selectedPublicationAction = selectedPublicationAction selectedSlotAction = selectedSlotAction relations = [] @@ -121,7 +116,11 @@ } } - export async function openNew(nis_flow: boolean, fixedScriptPath_?: string) { + export async function openNew( + nis_flow: boolean, + fixedScriptPath_?: string, + defaultValues?: Record + ) { drawerLoading = true try { selectedPublicationAction = 'create' @@ -137,16 +136,17 @@ script_path = fixedScriptPath path = '' initialPath = '' - replication_slot_name = '' - publication_name = '' - postgres_resource_path = '' + postgres_resource_path = defaultValues?.postgres_resource_path ?? '' edit = false dirtyPath = false - config.show = false publication_name = `windmill_publication_${random_adj()}` replication_slot_name = `windmill_replication_${random_adj()}` - transaction_to_track = ['Insert', 'Update', 'Delete'] - relations = [ + transaction_to_track = defaultValues?.publication.transaction_to_track || [ + 'Insert', + 'Update', + 'Delete' + ] + relations = defaultValues?.publication.table_to_track || [ { schema_name: 'public', table_to_track: [] @@ -184,6 +184,15 @@ } async function updateTrigger(): Promise { + if ( + selectedTable === 'specific' && + invalidRelations(relations, { + showError: true, + trackSchemaTableError: true + }) === true + ) { + return + } if (edit) { await PostgresTriggerService.updatePostgresTrigger({ workspace: $workspaceStore!, @@ -200,7 +209,7 @@ tab === 'basic' ? { transaction_to_track, - table_to_track + table_to_track: relations } : undefined } @@ -219,7 +228,7 @@ publication_name: tab === 'basic' ? undefined : publication_name, publication: { transaction_to_track, - table_to_track + table_to_track: relations } } }) @@ -256,24 +265,6 @@ sendUserToast(error.body, true) } } - - const checkDatabaseConfiguration = async () => { - if (emptyString(postgres_resource_path)) { - sendUserToast('You must first pick a database resource', true) - return - } - try { - loadingConfiguration = true - config.isLogical = await PostgresTriggerService.isValidPostgresConfiguration({ - workspace: $workspaceStore!, - path: postgres_resource_path - }) - config.show = true - } catch (error) { - sendUserToast(error.body, true) - } - loadingConfiguration = false - } @@ -311,9 +302,10 @@ disabled={pathError != '' || emptyString(postgres_resource_path) || emptyString(script_path) || - ((emptyString(replication_slot_name) || emptyString(publication_name)) && - tab === 'advanced') || - (relations.length === 0 && tab === 'basic') || + (tab === 'advanced' && emptyString(replication_slot_name)) || + emptyString(publication_name) || + (selectedTable !== 'all' && tab === 'basic' && relations.length === 0) || + transaction_to_track.length === 0 || !can_write} on:click={updateTrigger} > @@ -327,74 +319,28 @@

Loading...

{:else} -
- - {#if edit} - Changes can take up to 30 seconds to take effect. - {:else} - New postgres triggers can take up to 30 seconds to start listening. - {/if} - -
+ + {#if edit} + Changes can take up to 30 seconds to take effect. + {:else} + New postgres triggers can take up to 30 seconds to start listening. + {/if} +
-
- -
- -
-

- Pick a database to connect to -

-
- - {#if postgres_resource_path} - - {#if config.show} - - {#if config.isLogical} - Your database is correctly configured with logical replication enabled. You can - proceed with using the streaming feature - {:else} - Logical replication is not enabled on your database. To use this feature, your - Postgres database must have wal_level configured as 'logical' in your - database configuration. - {/if} - - {/if} - {/if} -
-
+
-

+

Pick a script or flow to be triggered

@@ -411,6 +357,7 @@ {#if script_path === undefined && is_flow === false}
- {#if postgres_resource_path} -
-
-

- Choose which table of your database to track as well as what kind of transaction - should fire the script.
- You must pick a database resource first to make the configuration of your trigger - -

-
-

- Choose the types of database transactions that should trigger a script or flow. - You can select from Insert, Update, - Delete, or any combination of these operations to define when the - trigger should activate. -

+
+

+ Pick a database to connect to +

+
+
+ + +
+ + {#if postgres_resource_path} +
-
-

- Select the tables to track. You can choose to track - all tables in your database, - all tables within a specific schema, - specific tables in a schema, or even - specific columns of a table. Additionally, you can apply a - filter to retrieve only rows that do not match the specified criteria. -

+ ulOptionsClass={'!bg-surface !text-sm'} + ulSelectedClass="!text-sm" + outerDivClass="!bg-surface !min-h-[38px] !border-[#d1d5db]" + placeholder="Select transactions" + --sms-options-margin="4px" + --sms-open-z-index="100" + > + +
+ +
+
+ + +
-
-
- {/if} + + {/if} +
+ {/if} diff --git a/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte b/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte index 15742bd04c20d..cd707bb5d35c2 100644 --- a/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte +++ b/frontend/src/lib/components/triggers/postgres/PostgresTriggersPanel.svelte @@ -1,29 +1,48 @@ -
- - - - +
+
+
+ { + if (selectedTable === 'all') { + cached = relations + relations = [] + } else { + relations = cached + } + }} + bind:selected={selectedTable} + > + + + +
+
{#if selectedTable !== 'all'} {#if relations && relations.length > 0} - {#each relations as v, i} -
-
- - {#each v.table_to_track as table_to_track, j} -
-
- - -
- {/each} + {/each} +
{/if} -
- + + + +
{/if}
diff --git a/frontend/src/lib/components/triggers/postgres/utils.ts b/frontend/src/lib/components/triggers/postgres/utils.ts new file mode 100644 index 0000000000000..6b4e3e91b52b7 --- /dev/null +++ b/frontend/src/lib/components/triggers/postgres/utils.ts @@ -0,0 +1,102 @@ +import type { Relations } from '$lib/gen' +import { sendUserToast } from '$lib/toast' +import { emptyString } from '$lib/utils' + +type RelationError = { + schemaIndex: number + tableIndex: number + schemaError: boolean + tableError: boolean + schemaName?: string + trackAllTablesInSchema: boolean + trackSpecificColumnsInTable: boolean + duplicateSchemaName: boolean | undefined +} +export function invalidRelations( + relations: Relations[], + options?: { + trackSchemaTableError?: boolean + showError?: boolean + } +): boolean { + let error: RelationError = { + schemaIndex: -1, + tableIndex: -1, + schemaError: false, + tableError: false, + trackAllTablesInSchema: false, + trackSpecificColumnsInTable: false, + duplicateSchemaName: undefined + } + + const duplicateName: Set = new Set() + for (const [schemaIndex, relation] of relations.entries()) { + error.schemaIndex = schemaIndex + 1 + error.schemaName = relation.schema_name + if (emptyString(relation.schema_name)) { + error.schemaError = true + break + } else { + if (duplicateName.has(relation.schema_name)) { + error.duplicateSchemaName = true + break + } + duplicateName.add(relation.schema_name) + const tableToTrack = relation.table_to_track + if (tableToTrack.length > 0) { + for (const [tableIndex, table] of tableToTrack.entries()) { + if (emptyString(table.table_name)) { + error.tableError = true + error.tableIndex = tableIndex + 1 + break + } + if ( + !error.trackSpecificColumnsInTable && + table.columns_name && + table.columns_name.length > 0 + ) { + error.trackSpecificColumnsInTable = true + } + } + if (error.tableError) { + break + } + } else if (!error.trackAllTablesInSchema) { + error.trackAllTablesInSchema = true + } + + if ( + options?.trackSchemaTableError && + error.trackAllTablesInSchema && + error.trackSpecificColumnsInTable + ) { + break + } + } + } + const errorFound = + error.tableError || + error.schemaError || + error.duplicateSchemaName || + ((options?.trackSchemaTableError ?? false) && + error.trackAllTablesInSchema && + error.trackSpecificColumnsInTable) + if ((options?.showError ?? false) && errorFound) { + let errorMessage: string = '' + + if (error.schemaError) { + errorMessage = `Schema Error: Please enter a name for schema number ${error.schemaIndex}` + } else if (error.tableError) { + errorMessage = `Table Error: Please enter a name for table number ${error.tableIndex} inside schema number ${error.schemaIndex}` + errorMessage += emptyString(error.schemaName) ? '' : ` named: ${error.schemaName}` + } else if (error.duplicateSchemaName) { + errorMessage = `Schema Error: schema name '${error.schemaName}' is already taken` + } else { + errorMessage = + 'Configuration Error: Schema-level tracking and specific table tracking with column selection cannot be used together. Refer to the documentation for valid configurations.' + } + sendUserToast(errorMessage, true) + } + + return errorFound +} diff --git a/frontend/src/lib/script_helpers.ts b/frontend/src/lib/script_helpers.ts index a3140736260e1..3cf9735a7f1eb 100644 --- a/frontend/src/lib/script_helpers.ts +++ b/frontend/src/lib/script_helpers.ts @@ -705,7 +705,7 @@ class Nats(TypedDict): length: int class WmTrigger(TypedDict): - kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats"] + kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats", "postgres"] http: Http | None websocket: Websocket | None kafka: Kafka | None diff --git a/frontend/src/routes/(root)/(logged)/postgres_triggers/+page.svelte b/frontend/src/routes/(root)/(logged)/postgres_triggers/+page.svelte index c69a447f3b121..76bb1f4bace2e 100644 --- a/frontend/src/routes/(root)/(logged)/postgres_triggers/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/postgres_triggers/+page.svelte @@ -389,7 +389,7 @@ displayName: canWrite ? 'Share' : 'See Permissions', icon: Share, action: () => { - shareModal.openDrawer(path, 'websocket_trigger') + shareModal.openDrawer(path, 'postgres_trigger') } } ]}