From a6a989c778747ad0189cb7f78a11a7c77b027257 Mon Sep 17 00:00:00 2001 From: andyoneal Date: Thu, 27 Jul 2023 19:44:54 -0500 Subject: [PATCH 1/7] double quote schema and table names --- target_postgres/sinks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 7c945586..5f83e670 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -340,7 +340,7 @@ def activate_version(self, new_version: int) -> None: self.logger.info("Hard delete: %s", self.config.get("hard_delete")) if self.config["hard_delete"] is True: self.connection.execute( - f"DELETE FROM {self.full_table_name} " + f"DELETE FROM \"{self.schema_name}\".\"{self.table_name}\" " f"WHERE {self.version_column_name} <= {new_version} " f"OR {self.version_column_name} IS NULL" ) @@ -357,7 +357,7 @@ def activate_version(self, new_version: int) -> None: ) # Need to deal with the case where data doesn't exist for the version column query = sqlalchemy.text( - f"UPDATE {self.full_table_name}\n" + f"UPDATE \"{self.schema_name}\".\"{self.table_name}\"\n" f"SET {self.soft_delete_column_name} = :deletedate \n" f"WHERE {self.version_column_name} < :version " f"OR {self.version_column_name} IS NULL \n" From ec5c0ea15210d537f368a9b6b6b6e373fb7c6e82 Mon Sep 17 00:00:00 2001 From: andyoneal Date: Thu, 27 Jul 2023 23:56:06 -0500 Subject: [PATCH 2/7] run black. add test. --- target_postgres/sinks.py | 4 ++-- .../test_activate_version_uppercase_stream_name.singer | 10 ++++++++++ target_postgres/tests/test_standard_target.py | 9 +++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 target_postgres/tests/data_files/test_activate_version_uppercase_stream_name.singer diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 5f83e670..80c57f9f 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -340,7 +340,7 @@ def activate_version(self, new_version: int) -> None: self.logger.info("Hard delete: %s", self.config.get("hard_delete")) if self.config["hard_delete"] is True: self.connection.execute( - f"DELETE FROM \"{self.schema_name}\".\"{self.table_name}\" " + f'DELETE FROM "{self.schema_name}"."{self.table_name}" ' f"WHERE {self.version_column_name} <= {new_version} " f"OR {self.version_column_name} IS NULL" ) @@ -357,7 +357,7 @@ def activate_version(self, new_version: int) -> None: ) # Need to deal with the case where data doesn't exist for the version column query = sqlalchemy.text( - f"UPDATE \"{self.schema_name}\".\"{self.table_name}\"\n" + f'UPDATE "{self.schema_name}"."{self.table_name}"\n' f"SET {self.soft_delete_column_name} = :deletedate \n" f"WHERE {self.version_column_name} < :version " f"OR {self.version_column_name} IS NULL \n" diff --git a/target_postgres/tests/data_files/test_activate_version_uppercase_stream_name.singer b/target_postgres/tests/data_files/test_activate_version_uppercase_stream_name.singer new file mode 100644 index 00000000..5487f90d --- /dev/null +++ b/target_postgres/tests/data_files/test_activate_version_uppercase_stream_name.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "Account", "schema": {"type": "object", "properties": {"Code": {"type": ["string"]}, "Name": {"type": ["null", "string"]}}}, "key_properties": ["Code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "Account", "version": 1674486431563} +{"type": "RECORD", "stream": "Account", "record": {"Code": "AF", "Name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "Account", "record": {"Code": "AN", "Name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "Account", "record": {"Code": "AS", "Name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "Account", "record": {"Code": "EU", "Name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "Account", "record": {"Code": "NA", "Name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "Account", "record": {"Code": "OC", "Name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "Account", "record": {"Code": "SA", "Name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "Account", "version": 1674486431563} diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index 0005e981..4ae21187 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -498,6 +498,15 @@ def test_uppercase_stream_name_with_column_alter(postgres_target): singer_file_to_target(file_name, postgres_target) +def test_activate_version_uppercase_stream_name(postgres_config_no_ssl): + """Activate Version should work with uppercase stream names""" + file_name = "test_activate_version_uppercase_stream_name.singer" + postgres_config_hard_delete = copy.deepcopy(postgres_config_no_ssl) + postgres_config_hard_delete["hard_delete"] = True + pg_hard_delete = TargetPostgres(config=postgres_config_hard_delete) + singer_file_to_target(file_name, pg_hard_delete) + + def test_postgres_ssl_no_config(postgres_config_no_ssl): """Test that connection will fail when no SSL configuration options are provided. From 4c8490cdb6e40829e1324cb8fac9bee1deaa0faa Mon Sep 17 00:00:00 2001 From: andyoneal Date: Sun, 3 Sep 2023 19:00:00 -0500 Subject: [PATCH 3/7] support sqlalchemy 2.0 --- poetry.lock | 109 ++++++++++-------- pyproject.toml | 2 +- target_postgres/connector.py | 11 +- target_postgres/sinks.py | 16 ++- target_postgres/tests/test_standard_target.py | 95 ++++++++++----- 5 files changed, 140 insertions(+), 93 deletions(-) diff --git a/poetry.lock b/poetry.lock index 4ceb0959..9d8ca40f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1437,74 +1437,81 @@ files = [ [[package]] name = "sqlalchemy" -version = "1.4.49" +version = "2.0.20" description = "Database Abstraction Library" optional = false -python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" -files = [ - {file = "SQLAlchemy-1.4.49-cp27-cp27m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:2e126cf98b7fd38f1e33c64484406b78e937b1a280e078ef558b95bf5b6895f6"}, - {file = "SQLAlchemy-1.4.49-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:03db81b89fe7ef3857b4a00b63dedd632d6183d4ea5a31c5d8a92e000a41fc71"}, - {file = "SQLAlchemy-1.4.49-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:95b9df9afd680b7a3b13b38adf6e3a38995da5e162cc7524ef08e3be4e5ed3e1"}, - {file = "SQLAlchemy-1.4.49-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a63e43bf3f668c11bb0444ce6e809c1227b8f067ca1068898f3008a273f52b09"}, - {file = "SQLAlchemy-1.4.49-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f835c050ebaa4e48b18403bed2c0fda986525896efd76c245bdd4db995e51a4c"}, - {file = "SQLAlchemy-1.4.49-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9c21b172dfb22e0db303ff6419451f0cac891d2e911bb9fbf8003d717f1bcf91"}, - {file = "SQLAlchemy-1.4.49-cp310-cp310-win32.whl", hash = "sha256:5fb1ebdfc8373b5a291485757bd6431de8d7ed42c27439f543c81f6c8febd729"}, - {file = "SQLAlchemy-1.4.49-cp310-cp310-win_amd64.whl", hash = "sha256:f8a65990c9c490f4651b5c02abccc9f113a7f56fa482031ac8cb88b70bc8ccaa"}, - {file = "SQLAlchemy-1.4.49-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:8923dfdf24d5aa8a3adb59723f54118dd4fe62cf59ed0d0d65d940579c1170a4"}, - {file = "SQLAlchemy-1.4.49-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a9ab2c507a7a439f13ca4499db6d3f50423d1d65dc9b5ed897e70941d9e135b0"}, - {file = "SQLAlchemy-1.4.49-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5debe7d49b8acf1f3035317e63d9ec8d5e4d904c6e75a2a9246a119f5f2fdf3d"}, - {file = "SQLAlchemy-1.4.49-cp311-cp311-win32.whl", hash = "sha256:82b08e82da3756765c2e75f327b9bf6b0f043c9c3925fb95fb51e1567fa4ee87"}, - {file = "SQLAlchemy-1.4.49-cp311-cp311-win_amd64.whl", hash = "sha256:171e04eeb5d1c0d96a544caf982621a1711d078dbc5c96f11d6469169bd003f1"}, - {file = "SQLAlchemy-1.4.49-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:36e58f8c4fe43984384e3fbe6341ac99b6b4e083de2fe838f0fdb91cebe9e9cb"}, - {file = "SQLAlchemy-1.4.49-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b31e67ff419013f99ad6f8fc73ee19ea31585e1e9fe773744c0f3ce58c039c30"}, - {file = "SQLAlchemy-1.4.49-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:c14b29d9e1529f99efd550cd04dbb6db6ba5d690abb96d52de2bff4ed518bc95"}, - {file = "SQLAlchemy-1.4.49-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c40f3470e084d31247aea228aa1c39bbc0904c2b9ccbf5d3cfa2ea2dac06f26d"}, - {file = "SQLAlchemy-1.4.49-cp36-cp36m-win32.whl", hash = "sha256:706bfa02157b97c136547c406f263e4c6274a7b061b3eb9742915dd774bbc264"}, - {file = "SQLAlchemy-1.4.49-cp36-cp36m-win_amd64.whl", hash = "sha256:a7f7b5c07ae5c0cfd24c2db86071fb2a3d947da7bd487e359cc91e67ac1c6d2e"}, - {file = "SQLAlchemy-1.4.49-cp37-cp37m-macosx_11_0_x86_64.whl", hash = "sha256:4afbbf5ef41ac18e02c8dc1f86c04b22b7a2125f2a030e25bbb4aff31abb224b"}, - {file = "SQLAlchemy-1.4.49-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:24e300c0c2147484a002b175f4e1361f102e82c345bf263242f0449672a4bccf"}, - {file = "SQLAlchemy-1.4.49-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:201de072b818f8ad55c80d18d1a788729cccf9be6d9dc3b9d8613b053cd4836d"}, - {file = "SQLAlchemy-1.4.49-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7653ed6817c710d0c95558232aba799307d14ae084cc9b1f4c389157ec50df5c"}, - {file = "SQLAlchemy-1.4.49-cp37-cp37m-win32.whl", hash = "sha256:647e0b309cb4512b1f1b78471fdaf72921b6fa6e750b9f891e09c6e2f0e5326f"}, - {file = "SQLAlchemy-1.4.49-cp37-cp37m-win_amd64.whl", hash = "sha256:ab73ed1a05ff539afc4a7f8cf371764cdf79768ecb7d2ec691e3ff89abbc541e"}, - {file = "SQLAlchemy-1.4.49-cp38-cp38-macosx_11_0_x86_64.whl", hash = "sha256:37ce517c011560d68f1ffb28af65d7e06f873f191eb3a73af5671e9c3fada08a"}, - {file = "SQLAlchemy-1.4.49-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a1878ce508edea4a879015ab5215546c444233881301e97ca16fe251e89f1c55"}, - {file = "SQLAlchemy-1.4.49-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:0e8e608983e6f85d0852ca61f97e521b62e67969e6e640fe6c6b575d4db68557"}, - {file = "SQLAlchemy-1.4.49-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ccf956da45290df6e809ea12c54c02ace7f8ff4d765d6d3dfb3655ee876ce58d"}, - {file = "SQLAlchemy-1.4.49-cp38-cp38-win32.whl", hash = "sha256:f167c8175ab908ce48bd6550679cc6ea20ae169379e73c7720a28f89e53aa532"}, - {file = "SQLAlchemy-1.4.49-cp38-cp38-win_amd64.whl", hash = "sha256:45806315aae81a0c202752558f0df52b42d11dd7ba0097bf71e253b4215f34f4"}, - {file = "SQLAlchemy-1.4.49-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:b6d0c4b15d65087738a6e22e0ff461b407533ff65a73b818089efc8eb2b3e1de"}, - {file = "SQLAlchemy-1.4.49-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a843e34abfd4c797018fd8d00ffffa99fd5184c421f190b6ca99def4087689bd"}, - {file = "SQLAlchemy-1.4.49-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:1c890421651b45a681181301b3497e4d57c0d01dc001e10438a40e9a9c25ee77"}, - {file = "SQLAlchemy-1.4.49-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d26f280b8f0a8f497bc10573849ad6dc62e671d2468826e5c748d04ed9e670d5"}, - {file = "SQLAlchemy-1.4.49-cp39-cp39-win32.whl", hash = "sha256:ec2268de67f73b43320383947e74700e95c6770d0c68c4e615e9897e46296294"}, - {file = "SQLAlchemy-1.4.49-cp39-cp39-win_amd64.whl", hash = "sha256:bbdf16372859b8ed3f4d05f925a984771cd2abd18bd187042f24be4886c2a15f"}, - {file = "SQLAlchemy-1.4.49.tar.gz", hash = "sha256:06ff25cbae30c396c4b7737464f2a7fc37a67b7da409993b182b024cec80aed9"}, +python-versions = ">=3.7" +files = [ + {file = "SQLAlchemy-2.0.20-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:759b51346aa388c2e606ee206c0bc6f15a5299f6174d1e10cadbe4530d3c7a98"}, + {file = "SQLAlchemy-2.0.20-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1506e988ebeaaf316f183da601f24eedd7452e163010ea63dbe52dc91c7fc70e"}, + {file = "SQLAlchemy-2.0.20-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5768c268df78bacbde166b48be788b83dddaa2a5974b8810af422ddfe68a9bc8"}, + {file = "SQLAlchemy-2.0.20-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a3f0dd6d15b6dc8b28a838a5c48ced7455c3e1fb47b89da9c79cc2090b072a50"}, + {file = "SQLAlchemy-2.0.20-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:243d0fb261f80a26774829bc2cee71df3222587ac789b7eaf6555c5b15651eed"}, + {file = "SQLAlchemy-2.0.20-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:6eb6d77c31e1bf4268b4d61b549c341cbff9842f8e115ba6904249c20cb78a61"}, + {file = "SQLAlchemy-2.0.20-cp310-cp310-win32.whl", hash = "sha256:bcb04441f370cbe6e37c2b8d79e4af9e4789f626c595899d94abebe8b38f9a4d"}, + {file = "SQLAlchemy-2.0.20-cp310-cp310-win_amd64.whl", hash = "sha256:d32b5ffef6c5bcb452723a496bad2d4c52b346240c59b3e6dba279f6dcc06c14"}, + {file = "SQLAlchemy-2.0.20-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:dd81466bdbc82b060c3c110b2937ab65ace41dfa7b18681fdfad2f37f27acdd7"}, + {file = "SQLAlchemy-2.0.20-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:6fe7d61dc71119e21ddb0094ee994418c12f68c61b3d263ebaae50ea8399c4d4"}, + {file = "SQLAlchemy-2.0.20-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e4e571af672e1bb710b3cc1a9794b55bce1eae5aed41a608c0401885e3491179"}, + {file = "SQLAlchemy-2.0.20-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3364b7066b3c7f4437dd345d47271f1251e0cfb0aba67e785343cdbdb0fff08c"}, + {file = "SQLAlchemy-2.0.20-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:1be86ccea0c965a1e8cd6ccf6884b924c319fcc85765f16c69f1ae7148eba64b"}, + {file = "SQLAlchemy-2.0.20-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:1d35d49a972649b5080557c603110620a86aa11db350d7a7cb0f0a3f611948a0"}, + {file = "SQLAlchemy-2.0.20-cp311-cp311-win32.whl", hash = "sha256:27d554ef5d12501898d88d255c54eef8414576f34672e02fe96d75908993cf53"}, + {file = "SQLAlchemy-2.0.20-cp311-cp311-win_amd64.whl", hash = "sha256:411e7f140200c02c4b953b3dbd08351c9f9818d2bd591b56d0fa0716bd014f1e"}, + {file = "SQLAlchemy-2.0.20-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:3c6aceebbc47db04f2d779db03afeaa2c73ea3f8dcd3987eb9efdb987ffa09a3"}, + {file = "SQLAlchemy-2.0.20-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7d3f175410a6db0ad96b10bfbb0a5530ecd4fcf1e2b5d83d968dd64791f810ed"}, + {file = "SQLAlchemy-2.0.20-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ea8186be85da6587456c9ddc7bf480ebad1a0e6dcbad3967c4821233a4d4df57"}, + {file = "SQLAlchemy-2.0.20-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:c3d99ba99007dab8233f635c32b5cd24fb1df8d64e17bc7df136cedbea427897"}, + {file = "SQLAlchemy-2.0.20-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:76fdfc0f6f5341987474ff48e7a66c3cd2b8a71ddda01fa82fedb180b961630a"}, + {file = "SQLAlchemy-2.0.20-cp37-cp37m-win32.whl", hash = "sha256:d3793dcf5bc4d74ae1e9db15121250c2da476e1af8e45a1d9a52b1513a393459"}, + {file = "SQLAlchemy-2.0.20-cp37-cp37m-win_amd64.whl", hash = "sha256:79fde625a0a55220d3624e64101ed68a059c1c1f126c74f08a42097a72ff66a9"}, + {file = "SQLAlchemy-2.0.20-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:599ccd23a7146e126be1c7632d1d47847fa9f333104d03325c4e15440fc7d927"}, + {file = "SQLAlchemy-2.0.20-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:1a58052b5a93425f656675673ef1f7e005a3b72e3f2c91b8acca1b27ccadf5f4"}, + {file = "SQLAlchemy-2.0.20-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:79543f945be7a5ada9943d555cf9b1531cfea49241809dd1183701f94a748624"}, + {file = "SQLAlchemy-2.0.20-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:63e73da7fb030ae0a46a9ffbeef7e892f5def4baf8064786d040d45c1d6d1dc5"}, + {file = "SQLAlchemy-2.0.20-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:3ce5e81b800a8afc870bb8e0a275d81957e16f8c4b62415a7b386f29a0cb9763"}, + {file = "SQLAlchemy-2.0.20-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:cb0d3e94c2a84215532d9bcf10229476ffd3b08f481c53754113b794afb62d14"}, + {file = "SQLAlchemy-2.0.20-cp38-cp38-win32.whl", hash = "sha256:8dd77fd6648b677d7742d2c3cc105a66e2681cc5e5fb247b88c7a7b78351cf74"}, + {file = "SQLAlchemy-2.0.20-cp38-cp38-win_amd64.whl", hash = "sha256:6f8a934f9dfdf762c844e5164046a9cea25fabbc9ec865c023fe7f300f11ca4a"}, + {file = "SQLAlchemy-2.0.20-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:26a3399eaf65e9ab2690c07bd5cf898b639e76903e0abad096cd609233ce5208"}, + {file = "SQLAlchemy-2.0.20-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4cde2e1096cbb3e62002efdb7050113aa5f01718035ba9f29f9d89c3758e7e4e"}, + {file = "SQLAlchemy-2.0.20-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d1b09ba72e4e6d341bb5bdd3564f1cea6095d4c3632e45dc69375a1dbe4e26ec"}, + {file = "SQLAlchemy-2.0.20-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1b74eeafaa11372627ce94e4dc88a6751b2b4d263015b3523e2b1e57291102f0"}, + {file = "SQLAlchemy-2.0.20-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:77d37c1b4e64c926fa3de23e8244b964aab92963d0f74d98cbc0783a9e04f501"}, + {file = "SQLAlchemy-2.0.20-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:eefebcc5c555803065128401a1e224a64607259b5eb907021bf9b175f315d2a6"}, + {file = "SQLAlchemy-2.0.20-cp39-cp39-win32.whl", hash = "sha256:3423dc2a3b94125094897118b52bdf4d37daf142cbcf26d48af284b763ab90e9"}, + {file = "SQLAlchemy-2.0.20-cp39-cp39-win_amd64.whl", hash = "sha256:5ed61e3463021763b853628aef8bc5d469fe12d95f82c74ef605049d810f3267"}, + {file = "SQLAlchemy-2.0.20-py3-none-any.whl", hash = "sha256:63a368231c53c93e2b67d0c5556a9836fdcd383f7e3026a39602aad775b14acf"}, + {file = "SQLAlchemy-2.0.20.tar.gz", hash = "sha256:ca8a5ff2aa7f3ade6c498aaafce25b1eaeabe4e42b73e25519183e4566a16fc6"}, ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\")"} +greenlet = {version = "!=0.4.17", markers = "platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\""} importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} +typing-extensions = ">=4.2.0" [package.extras] -aiomysql = ["aiomysql", "greenlet (!=0.4.17)"] +aiomysql = ["aiomysql (>=0.2.0)", "greenlet (!=0.4.17)"] aiosqlite = ["aiosqlite", "greenlet (!=0.4.17)", "typing-extensions (!=3.10.0.1)"] asyncio = ["greenlet (!=0.4.17)"] -asyncmy = ["asyncmy (>=0.2.3,!=0.2.4)", "greenlet (!=0.4.17)"] -mariadb-connector = ["mariadb (>=1.0.1,!=1.1.2)"] +asyncmy = ["asyncmy (>=0.2.3,!=0.2.4,!=0.2.6)", "greenlet (!=0.4.17)"] +mariadb-connector = ["mariadb (>=1.0.1,!=1.1.2,!=1.1.5)"] mssql = ["pyodbc"] mssql-pymssql = ["pymssql"] mssql-pyodbc = ["pyodbc"] -mypy = ["mypy (>=0.910)", "sqlalchemy2-stubs"] -mysql = ["mysqlclient (>=1.4.0)", "mysqlclient (>=1.4.0,<2)"] +mypy = ["mypy (>=0.910)"] +mysql = ["mysqlclient (>=1.4.0)"] mysql-connector = ["mysql-connector-python"] -oracle = ["cx-oracle (>=7)", "cx-oracle (>=7,<8)"] +oracle = ["cx-oracle (>=7)"] +oracle-oracledb = ["oracledb (>=1.0.1)"] postgresql = ["psycopg2 (>=2.7)"] postgresql-asyncpg = ["asyncpg", "greenlet (!=0.4.17)"] -postgresql-pg8000 = ["pg8000 (>=1.16.6,!=1.29.0)"] +postgresql-pg8000 = ["pg8000 (>=1.29.1)"] +postgresql-psycopg = ["psycopg (>=3.0.7)"] postgresql-psycopg2binary = ["psycopg2-binary"] postgresql-psycopg2cffi = ["psycopg2cffi"] -pymysql = ["pymysql", "pymysql (<1)"] +postgresql-psycopgbinary = ["psycopg[binary] (>=3.0.7)"] +pymysql = ["pymysql"] sqlcipher = ["sqlcipher3-binary"] [[package]] @@ -1680,4 +1687,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "<3.12,>=3.7.1" -content-hash = "fbbdefbdb7a019657032357e407e90d074878a6b4a8f3d36bdf5fec7273f46dc" +content-hash = "462f263ca629588c06d46f78c69effaf348ee86660d06bca04ab5e916ebc79ce" diff --git a/pyproject.toml b/pyproject.toml index bc4d2f08..e5910b78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ python = "<3.12,>=3.7.1" requests = "^2.25.1" singer-sdk = ">=0.28,<0.32" psycopg2-binary = "2.9.7" -sqlalchemy = "<2" +sqlalchemy = ">=2.0,<3.0" sshtunnel = "0.4.0" [tool.poetry.dev-dependencies] diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 0b813cd0..03556a22 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -96,7 +96,7 @@ def prepare_table( as_temp_table: True to create a temp table. """ _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(bind=connection, schema=schema_name) + meta = sqlalchemy.MetaData(schema=schema_name) table: sqlalchemy.Table = None if not self.table_exists(full_table_name=full_table_name): table = self.create_empty_table( @@ -109,7 +109,7 @@ def prepare_table( connection=connection, ) return table - meta.reflect(only=[table_name]) + meta.reflect(connection, only=[table_name]) table = meta.tables[ full_table_name ] # So we don't mess up the casing of the Table reference @@ -139,7 +139,7 @@ def copy_table_structure( as_temp_table: True to create a temp table. """ _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(bind=connection, schema=schema_name) + meta = sqlalchemy.MetaData(schema=schema_name) new_table: sqlalchemy.Table = None columns = [] if self.table_exists(full_table_name=full_table_name): @@ -199,8 +199,7 @@ def clone_table( ) else: new_table = sqlalchemy.Table(new_table_name, metadata, *new_columns) - with self._connect() as connection: - new_table.create(bind=connection) + new_table.create(bind=connection) return new_table @staticmethod @@ -425,6 +424,7 @@ def _create_empty_column( column_type=sql_type, ) connection.execute(column_add_ddl) + connection.commit() def get_column_add_ddl( self, @@ -519,6 +519,7 @@ def _adapt_column_type( column_type=compatible_sql_type, ) connection.execute(alter_column_ddl) + connection.commit() def get_column_alter_ddl( self, diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index bc4d0879..c04c8f29 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -99,6 +99,7 @@ def process_batch(self, context: dict) -> None: ) # Drop temp table self.connector.drop_table(table=temp_table, connection=connection) + connection.commit() def generate_temp_table_name(self): """Uuid temp table name.""" @@ -337,11 +338,14 @@ def activate_version(self, new_version: int) -> None: self.logger.info("Hard delete: %s", self.config.get("hard_delete")) if self.config["hard_delete"] is True: - self.connection.execute( - f'DELETE FROM "{self.schema_name}"."{self.table_name}" ' - f"WHERE {self.version_column_name} <= {new_version} " - f"OR {self.version_column_name} IS NULL" - ) + with self.connector._connect() as connection, connection.begin(): + connection.execute( + sqlalchemy.text( + f'DELETE FROM "{self.schema_name}"."{self.table_name}" ' + f"WHERE {self.version_column_name} <= {new_version} " + f"OR {self.version_column_name} IS NULL" + ) + ) return if not self.connector.column_exists( @@ -365,5 +369,5 @@ def activate_version(self, new_version: int) -> None: bindparam("deletedate", value=deleted_at, type_=datetime_type), bindparam("version", value=new_version, type_=integer_type), ) - with self.connector._connect() as connection: + with self.connector._connect() as connection, connection.begin(): connection.execute(query) diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index 44222f02..71c8b904 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -154,7 +154,7 @@ def test_port_default_config(): engine: sqlalchemy.engine.Engine = connector._engine assert ( - str(engine.url) + engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:5432/{database}" ) @@ -179,7 +179,7 @@ def test_port_config(): engine: sqlalchemy.engine.Engine = connector._engine assert ( - str(engine.url) + engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:5433/{database}" ) @@ -286,7 +286,9 @@ def test_relational_data(postgres_target): ] full_table_name = f"{schema_name}.test_users" - result = connection.execute(f"SELECT * FROM {full_table_name} ORDER BY id") + result = connection.execute( + sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id") + ) result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] assert result_dict == expected_test_users @@ -299,7 +301,9 @@ def test_relational_data(postgres_target): ] full_table_name = f"{schema_name}.test_locations" - result = connection.execute(f"SELECT * FROM {full_table_name} ORDER BY id") + result = connection.execute( + sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id") + ) result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] assert result_dict == expected_test_locations @@ -337,7 +341,9 @@ def test_relational_data(postgres_target): ] full_table_name = f"{schema_name}.test_user_in_location" - result = connection.execute(f"SELECT * FROM {full_table_name} ORDER BY id") + result = connection.execute( + sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id") + ) result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] assert result_dict == expected_test_user_in_location @@ -347,8 +353,10 @@ def test_no_primary_keys(postgres_target): engine = create_engine(postgres_target) table_name = "test_no_pk" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name - with engine.connect() as connection: - result = connection.execute(f"DROP TABLE IF EXISTS {full_table_name}") + with engine.connect() as connection, connection.begin(): + result = connection.execute( + sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") + ) file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) @@ -363,7 +371,7 @@ def test_no_primary_keys(postgres_target): # Will populate us with 22 records, we run this twice with engine.connect() as connection: - result = connection.execute(f"SELECT * FROM {full_table_name}") + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 16 @@ -428,8 +436,10 @@ def test_anyof(postgres_target): schema = postgres_target.config["default_target_schema"] singer_file_to_target(file_name, postgres_target) with engine.connect() as connection: - meta = sqlalchemy.MetaData(bind=connection) - table = sqlalchemy.Table("commits", meta, schema=schema, autoload=True) + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + "commits", meta, schema=schema, autoload_with=connection + ) for column in table.c: # {"type":"string"} if column.name == "id": @@ -474,23 +484,29 @@ def test_activate_version_hard_delete(postgres_config_no_ssl): engine = create_engine(pg_hard_delete_true) singer_file_to_target(file_name, pg_hard_delete_true) with engine.connect() as connection: - result = connection.execute(f"SELECT * FROM {full_table_name}") + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 + with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" + sqlalchemy.text( + f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" + ) ) result = connection.execute( - f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" + sqlalchemy.text( + f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" + ) ) - result = connection.execute(f"SELECT * FROM {full_table_name}") + with engine.connect() as connection: + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_hard_delete_true) # Should remove the 2 records we added manually with engine.connect() as connection: - result = connection.execute(f"SELECT * FROM {full_table_name}") + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 @@ -500,35 +516,45 @@ def test_activate_version_soft_delete(postgres_target): table_name = "test_activate_version_soft" file_name = f"{table_name}.singer" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name - with engine.connect() as connection: - result = connection.execute(f"DROP TABLE IF EXISTS {full_table_name}") + with engine.connect() as connection, connection.begin(): + result = connection.execute( + sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") + ) postgres_config_soft_delete = copy.deepcopy(postgres_target._config) postgres_config_soft_delete["hard_delete"] = False pg_soft_delete = TargetPostgres(config=postgres_config_soft_delete) singer_file_to_target(file_name, pg_soft_delete) with engine.connect() as connection: - result = connection.execute(f"SELECT * FROM {full_table_name}") + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 + with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" + sqlalchemy.text( + f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" + ) ) result = connection.execute( - f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" + sqlalchemy.text( + f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" + ) ) - result = connection.execute(f"SELECT * FROM {full_table_name}") + with engine.connect() as connection: + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_soft_delete) # Should have all records including the 2 we added manually with engine.connect() as connection: - result = connection.execute(f"SELECT * FROM {full_table_name}") + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 result = connection.execute( - f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" + sqlalchemy.text( + f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" + ) ) assert result.rowcount == 2 @@ -539,8 +565,10 @@ def test_activate_version_deletes_data_properly(postgres_target): table_name = "test_activate_version_deletes_data_properly" file_name = f"{table_name}.singer" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name - with engine.connect() as connection: - result = connection.execute(f"DROP TABLE IF EXISTS {full_table_name}") + with engine.connect() as connection, connection.begin(): + result = connection.execute( + sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") + ) postgres_config_soft_delete = copy.deepcopy(postgres_target._config) postgres_config_soft_delete["hard_delete"] = True @@ -548,20 +576,27 @@ def test_activate_version_deletes_data_properly(postgres_target): singer_file_to_target(file_name, pg_hard_delete) # Will populate us with 7 records with engine.connect() as connection: + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + assert result.rowcount == 7 + with engine.connect() as connection, connection.begin(): result = connection.execute( - f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual1', 'Meltano')" + sqlalchemy.text( + f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual1', 'Meltano')" + ) ) result = connection.execute( - f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual2', 'Meltano')" + sqlalchemy.text( + f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual2', 'Meltano')" + ) ) - result = connection.execute(f"SELECT * FROM {full_table_name}") + with engine.connect() as connection: + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 - # Only has a schema and one activate_version message, should delete all records as it's a higher version than what's currently in the table file_name = f"{table_name}_2.singer" singer_file_to_target(file_name, pg_hard_delete) with engine.connect() as connection: - result = connection.execute(f"SELECT * FROM {full_table_name}") + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 0 From 2ee2b56e56c2c0af3671575fe4bb5708ad0611d5 Mon Sep 17 00:00:00 2001 From: andyoneal Date: Sun, 3 Sep 2023 20:41:58 -0500 Subject: [PATCH 4/7] fix transaction issues --- target_postgres/connector.py | 19 +++++++--- target_postgres/sinks.py | 73 +++++++++++++++++++----------------- 2 files changed, 51 insertions(+), 41 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 03556a22..525fa19c 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -375,7 +375,7 @@ def prepare_column( sql_type: the SQLAlchemy type. schema_name: the schema name. """ - if not self.column_exists(table.fullname, column_name): + if not self.column_exists(table.fullname, column_name, connection=connection): self._create_empty_column( # We should migrate every function to use sqlalchemy.Table # instead of having to know what the function wants @@ -424,7 +424,6 @@ def _create_empty_column( column_type=sql_type, ) connection.execute(column_add_ddl) - connection.commit() def get_column_add_ddl( self, @@ -481,6 +480,7 @@ def _adapt_column_type( schema_name=schema_name, table_name=table_name, column_name=column_name, + connection=connection, ) # remove collation if present and save it @@ -519,7 +519,6 @@ def _adapt_column_type( column_type=compatible_sql_type, ) connection.execute(alter_column_ddl) - connection.commit() def get_column_alter_ddl( self, @@ -692,6 +691,7 @@ def _get_column_type( schema_name: str, table_name: str, column_name: str, + connection: sqlalchemy.engine.Connection, ) -> sqlalchemy.types.TypeEngine: """Get the SQL type of the declared column. @@ -709,6 +709,7 @@ def _get_column_type( column = self.get_table_columns( schema_name=schema_name, table_name=table_name, + connection=connection, )[column_name] except KeyError as ex: msg = ( @@ -723,6 +724,7 @@ def get_table_columns( self, schema_name: str, table_name: str, + connection: sqlalchemy.engine.Connection, column_names: list[str] | None = None, ) -> dict[str, sqlalchemy.Column]: """Return a list of table columns. @@ -737,7 +739,7 @@ def get_table_columns( Returns: An ordered list of column objects. """ - inspector = sqlalchemy.inspect(self._engine) + inspector = sqlalchemy.inspect(connection) columns = inspector.get_columns(table_name, schema_name) return { @@ -751,7 +753,12 @@ def get_table_columns( or col_meta["name"].casefold() in {col.casefold() for col in column_names} } - def column_exists(self, full_table_name: str, column_name: str) -> bool: + def column_exists( + self, + full_table_name: str, + column_name: str, + connection: sqlalchemy.engine.Connection, + ) -> bool: """Determine if the target column already exists. Args: @@ -765,7 +772,7 @@ def column_exists(self, full_table_name: str, column_name: str) -> bool: assert schema_name is not None assert table_name is not None return column_name in self.get_table_columns( - schema_name=schema_name, table_name=table_name + schema_name=schema_name, table_name=table_name, connection=connection ) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index c04c8f29..4a79bc1f 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -46,7 +46,7 @@ def setup(self) -> None: self.append_only = False if self.schema_name: self.connector.prepare_schema(self.schema_name) - with self.connector._connect() as connection: + with self.connector._connect() as connection, connection.begin(): self.connector.prepare_table( full_table_name=self.full_table_name, schema=self.schema, @@ -326,19 +326,21 @@ def activate_version(self, new_version: int) -> None: # same as SCHEMA messsages integer_type = self.connector.to_sql_type({"type": "integer"}) - if not self.connector.column_exists( - full_table_name=self.full_table_name, - column_name=self.version_column_name, - ): - self.connector.prepare_column( - self.full_table_name, - self.version_column_name, - sql_type=integer_type, - ) + with self.connector._connect() as connection, connection.begin(): + if not self.connector.column_exists( + full_table_name=self.full_table_name, + column_name=self.version_column_name, + connection=connection, + ): + self.connector.prepare_column( + self.full_table_name, + self.version_column_name, + sql_type=integer_type, + connection=connection, + ) - self.logger.info("Hard delete: %s", self.config.get("hard_delete")) - if self.config["hard_delete"] is True: - with self.connector._connect() as connection, connection.begin(): + self.logger.info("Hard delete: %s", self.config.get("hard_delete")) + if self.config["hard_delete"] is True: connection.execute( sqlalchemy.text( f'DELETE FROM "{self.schema_name}"."{self.table_name}" ' @@ -346,28 +348,29 @@ def activate_version(self, new_version: int) -> None: f"OR {self.version_column_name} IS NULL" ) ) - return + return - if not self.connector.column_exists( - full_table_name=self.full_table_name, - column_name=self.soft_delete_column_name, - ): - self.connector.prepare_column( - self.full_table_name, - self.soft_delete_column_name, - sql_type=datetime_type, + if not self.connector.column_exists( + full_table_name=self.full_table_name, + column_name=self.soft_delete_column_name, + connection=connection, + ): + self.connector.prepare_column( + self.full_table_name, + self.soft_delete_column_name, + sql_type=datetime_type, + connection=connection, + ) + # Need to deal with the case where data doesn't exist for the version column + query = sqlalchemy.text( + f'UPDATE "{self.schema_name}"."{self.table_name}"\n' + f"SET {self.soft_delete_column_name} = :deletedate \n" + f"WHERE {self.version_column_name} < :version " + f"OR {self.version_column_name} IS NULL \n" + f" AND {self.soft_delete_column_name} IS NULL\n" + ) + query = query.bindparams( + bindparam("deletedate", value=deleted_at, type_=datetime_type), + bindparam("version", value=new_version, type_=integer_type), ) - # Need to deal with the case where data doesn't exist for the version column - query = sqlalchemy.text( - f'UPDATE "{self.schema_name}"."{self.table_name}"\n' - f"SET {self.soft_delete_column_name} = :deletedate \n" - f"WHERE {self.version_column_name} < :version " - f"OR {self.version_column_name} IS NULL \n" - f" AND {self.soft_delete_column_name} IS NULL\n" - ) - query = query.bindparams( - bindparam("deletedate", value=deleted_at, type_=datetime_type), - bindparam("version", value=new_version, type_=integer_type), - ) - with self.connector._connect() as connection, connection.begin(): connection.execute(query) From 371b2941de0536f626f0ac02c3b89cb3b16d1dbb Mon Sep 17 00:00:00 2001 From: andyoneal Date: Sun, 3 Sep 2023 20:49:52 -0500 Subject: [PATCH 5/7] begin/commit consistency --- target_postgres/sinks.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 4a79bc1f..a71098e6 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -65,7 +65,7 @@ def process_batch(self, context: dict) -> None: context: Stream partition or context dictionary. """ # Use one connection so we do this all in a single transaction - with self.connector._connect() as connection: + with self.connector._connect() as connection, connection.begin(): # Check structure of table table: sqlalchemy.Table = self.connector.prepare_table( full_table_name=self.full_table_name, @@ -99,7 +99,6 @@ def process_batch(self, context: dict) -> None: ) # Drop temp table self.connector.drop_table(table=temp_table, connection=connection) - connection.commit() def generate_temp_table_name(self): """Uuid temp table name.""" From 90907f2fbecc07b227d0bfe81bbdb1ef128802dd Mon Sep 17 00:00:00 2001 From: andyoneal Date: Wed, 13 Sep 2023 00:36:10 -0500 Subject: [PATCH 6/7] fix create_schema --- poetry.lock | 18 ++---------------- target_postgres/connector.py | 9 +++++++++ 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/poetry.lock b/poetry.lock index 63f92631..0593f3d1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -483,7 +483,6 @@ files = [ {file = "greenlet-2.0.2-cp27-cp27m-win32.whl", hash = "sha256:6c3acb79b0bfd4fe733dff8bc62695283b57949ebcca05ae5c129eb606ff2d74"}, {file = "greenlet-2.0.2-cp27-cp27m-win_amd64.whl", hash = "sha256:283737e0da3f08bd637b5ad058507e578dd462db259f7f6e4c5c365ba4ee9343"}, {file = "greenlet-2.0.2-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:d27ec7509b9c18b6d73f2f5ede2622441de812e7b1a80bbd446cb0633bd3d5ae"}, - {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d967650d3f56af314b72df7089d96cda1083a7fc2da05b375d2bc48c82ab3f3c"}, {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:30bcf80dda7f15ac77ba5af2b961bdd9dbc77fd4ac6105cee85b0d0a5fcf74df"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:26fbfce90728d82bc9e6c38ea4d038cba20b7faf8a0ca53a9c07b67318d46088"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9190f09060ea4debddd24665d6804b995a9c122ef5917ab26e1566dcc712ceeb"}, @@ -492,7 +491,6 @@ files = [ {file = "greenlet-2.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:76ae285c8104046b3a7f06b42f29c7b73f77683df18c49ab5af7983994c2dd91"}, {file = "greenlet-2.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:2d4686f195e32d36b4d7cf2d166857dbd0ee9f3d20ae349b6bf8afc8485b3645"}, {file = "greenlet-2.0.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c4302695ad8027363e96311df24ee28978162cdcdd2006476c43970b384a244c"}, - {file = "greenlet-2.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d4606a527e30548153be1a9f155f4e283d109ffba663a15856089fb55f933e47"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c48f54ef8e05f04d6eff74b8233f6063cb1ed960243eacc474ee73a2ea8573ca"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a1846f1b999e78e13837c93c778dcfc3365902cfb8d1bdb7dd73ead37059f0d0"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a06ad5312349fec0ab944664b01d26f8d1f05009566339ac6f63f56589bc1a2"}, @@ -522,7 +520,6 @@ files = [ {file = "greenlet-2.0.2-cp37-cp37m-win32.whl", hash = "sha256:3f6ea9bd35eb450837a3d80e77b517ea5bc56b4647f5502cd28de13675ee12f7"}, {file = "greenlet-2.0.2-cp37-cp37m-win_amd64.whl", hash = "sha256:7492e2b7bd7c9b9916388d9df23fa49d9b88ac0640db0a5b4ecc2b653bf451e3"}, {file = "greenlet-2.0.2-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:b864ba53912b6c3ab6bcb2beb19f19edd01a6bfcbdfe1f37ddd1778abfe75a30"}, - {file = "greenlet-2.0.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:1087300cf9700bbf455b1b97e24db18f2f77b55302a68272c56209d5587c12d1"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux2010_x86_64.whl", hash = "sha256:ba2956617f1c42598a308a84c6cf021a90ff3862eddafd20c3333d50f0edb45b"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fc3a569657468b6f3fb60587e48356fe512c1754ca05a564f11366ac9e306526"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8eab883b3b2a38cc1e050819ef06a7e6344d4a990d24d45bc6f2cf959045a45b"}, @@ -531,7 +528,6 @@ files = [ {file = "greenlet-2.0.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b0ef99cdbe2b682b9ccbb964743a6aca37905fda5e0452e5ee239b1654d37f2a"}, {file = "greenlet-2.0.2-cp38-cp38-win32.whl", hash = "sha256:b80f600eddddce72320dbbc8e3784d16bd3fb7b517e82476d8da921f27d4b249"}, {file = "greenlet-2.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:4d2e11331fc0c02b6e84b0d28ece3a36e0548ee1a1ce9ddde03752d9b79bba40"}, - {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8512a0c38cfd4e66a858ddd1b17705587900dd760c6003998e9472b77b56d417"}, {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:88d9ab96491d38a5ab7c56dd7a3cc37d83336ecc564e4e8816dbed12e5aaefc8"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:561091a7be172ab497a3527602d467e2b3fbe75f9e783d8b8ce403fa414f71a6"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:971ce5e14dc5e73715755d0ca2975ac88cfdaefcaab078a284fea6cfabf866df"}, @@ -1198,7 +1194,6 @@ files = [ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"}, - {file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"}, {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"}, {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"}, {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"}, @@ -1206,15 +1201,8 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"}, - {file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"}, {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"}, {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, - {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, - {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, - {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, - {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, - {file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"}, {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"}, @@ -1231,7 +1219,6 @@ files = [ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"}, - {file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"}, {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"}, {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"}, {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"}, @@ -1239,7 +1226,6 @@ files = [ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"}, - {file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"}, {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"}, {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"}, {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"}, @@ -1500,7 +1486,7 @@ files = [ ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")"} +greenlet = {version = "!=0.4.17", markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\""} importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} typing-extensions = ">=4.2.0" @@ -1701,4 +1687,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "<3.12,>=3.7.1" -content-hash = "103b89827eb98b5912538bac507d48843c8139fa5ae82de81890b2b8bc7caf57" +content-hash = "d7fab16bafd26dcb926a7e353f5db9abf01c8b1211ad20b945e0d28621acf046" diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 5ce93a4c..d48c05fb 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -293,6 +293,15 @@ def pick_best_sql_type(sql_type_array: list): return obj return VARCHAR() + def create_schema(self, schema_name: str) -> None: + """Create target schema. + + Args: + schema_name: The target schema to create. + """ + with self._connect() as conn, conn.begin(): + conn.execute(sqlalchemy.schema.CreateSchema(schema_name)) + def create_empty_table( self, table_name: str, From 49b377a6ae44a6edc15e8ddcd129a38cf18ad7a4 Mon Sep 17 00:00:00 2001 From: andyoneal Date: Tue, 3 Oct 2023 04:42:38 -0500 Subject: [PATCH 7/7] remove create_schema --- target_postgres/connector.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 40b18380..0cdd2677 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -301,15 +301,6 @@ def pick_best_sql_type(sql_type_array: list): return obj return VARCHAR() - def create_schema(self, schema_name: str) -> None: - """Create target schema. - - Args: - schema_name: The target schema to create. - """ - with self._connect() as conn, conn.begin(): - conn.execute(sqlalchemy.schema.CreateSchema(schema_name)) - def create_empty_table( self, table_name: str,