diff --git a/projects/vdk-plugins/vdk-trino/.plugin-ci.yml b/projects/vdk-plugins/vdk-trino/.plugin-ci.yml index 2ed29ab4ff..40e4a3bd3d 100644 --- a/projects/vdk-plugins/vdk-trino/.plugin-ci.yml +++ b/projects/vdk-plugins/vdk-trino/.plugin-ci.yml @@ -4,9 +4,11 @@ image: "python:3.7" .build-vdk-trino: - image: docker:19.03.8 + image: docker:23.0.1 services: - - docker:19.03.8-dind + - name: docker:23.0.1-dind + # explicitly disable tls to avoid docker startup interruption + command: ["--tls=false"] variables: DOCKER_HOST: tcp://localhost:2375 DOCKER_DRIVER: overlay2 diff --git a/projects/vdk-plugins/vdk-trino/requirements.txt b/projects/vdk-plugins/vdk-trino/requirements.txt index 9cd92b9b4a..f12b88a559 100644 --- a/projects/vdk-plugins/vdk-trino/requirements.txt +++ b/projects/vdk-plugins/vdk-trino/requirements.txt @@ -2,7 +2,7 @@ # testing requirements click docker-compose -pytest-docker +testcontainers trino vdk-core vdk-test-utils diff --git a/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/trino_utils.py b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/trino_utils.py index cdc5d02116..bbf0b96e67 100644 --- a/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/trino_utils.py +++ b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/trino_utils.py @@ -203,7 +203,7 @@ def perform_safe_move_data_to_table_step( "Then try to rerun the data job OR\n" "3. Report the issue to support team.""", ) - if result: + if result is not None: log.debug("Target table was successfully created, and we can drop backup") self.drop_table(to_db, backup_table_name) diff --git a/projects/vdk-plugins/vdk-trino/tests/conftest.py b/projects/vdk-plugins/vdk-trino/tests/conftest.py index 97ddcc2509..7378b8c464 100644 --- a/projects/vdk-plugins/vdk-trino/tests/conftest.py +++ b/projects/vdk-plugins/vdk-trino/tests/conftest.py @@ -5,8 +5,10 @@ from unittest import mock import pytest -from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner -from vdk.plugin.trino import trino_plugin +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + +TRINO_IMAGE = "trinodb/trino:latest" VDK_TRINO_HOST = "VDK_TRINO_HOST" VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE" @@ -14,22 +16,6 @@ VDK_TRINO_USE_SSL = "VDK_TRINO_USE_SSL" -def is_responsive(runner): - try: - result = runner.invoke(["trino-query", "--query", "SELECT 1"]) - if result.exit_code == 0: - return True - except: - return False - - -@pytest.fixture(scope="session") -def docker_compose_file(pytestconfig): - return os.path.join( - os.path.dirname(os.path.abspath(__file__)), "docker-compose.yml" - ) - - @pytest.fixture(scope="session") @mock.patch.dict( os.environ, @@ -40,19 +26,28 @@ def docker_compose_file(pytestconfig): VDK_TRINO_USE_SSL: "False", }, ) -def trino_service(docker_ip, docker_services): +def trino_service(request): """Ensure that Trino service is up and responsive.""" # os.system("echo Check open ports:") # os.system("ss -lntu") - runner = CliEntryBasedTestRunner(trino_plugin) - - # give the server some time to start before checking if it is ready - # before adding this sleep there were intermittent fails of the CI/CD with error: - # requests.exceptions.ConnectionError: - # ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer')) - # More info: https://stackoverflow.com/questions/383738/104-connection-reset-by-peer-socket-error-or-when-does-closing-a-socket-resu - time.sleep(3) - - docker_services.wait_until_responsive( - timeout=30.0, pause=0.3, check=lambda: is_responsive(runner) - ) + port = int(os.environ[VDK_TRINO_PORT]) + container = DockerContainer(TRINO_IMAGE).with_bind_ports(port, port) + try: + container.start() + wait_for_logs(container, "SERVER STARTED", timeout=120) + time.sleep(2) + print( + f"Trino service started on port {container.get_exposed_port(port)} and host {container.get_container_host_ip()}" + ) + except Exception as e: + print(f"Failed to start Trino service: {e}") + print(f"Container logs: {container.get_logs()}") + raise e + + def stop_container(): + container.stop() + print("Trino service stopped") + + request.addfinalizer(stop_container) + + return container diff --git a/projects/vdk-plugins/vdk-trino/tests/docker-compose.yml b/projects/vdk-plugins/vdk-trino/tests/docker-compose.yml deleted file mode 100644 index 25b047b189..0000000000 --- a/projects/vdk-plugins/vdk-trino/tests/docker-compose.yml +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright 2021-2023 VMware, Inc. -# SPDX-License-Identifier: Apache-2.0 - -services: - trino: - image: "trinodb/trino:372" - ports: - - "8080:8080" diff --git a/projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py b/projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py index 24e02976b6..4637134680 100644 --- a/projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py +++ b/projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py @@ -4,6 +4,7 @@ import os import pathlib import unittest +import uuid from unittest import mock import pytest @@ -27,7 +28,9 @@ def trino_move_data_to_table_break_tmp_to_target( obj, from_db: str, from_table_name: str, to_db: str, to_table_name: str ): - if from_table_name == "tmp_dw_scmdb_people" and to_table_name == "dw_scmdb_people": + if from_table_name.startswith("tmp_dw_people") and to_table_name.startswith( + "dw_people" + ): obj.drop_table(from_db, from_table_name) return org_move_data_to_table(obj, from_db, from_table_name, to_db, to_table_name) @@ -35,34 +38,44 @@ def trino_move_data_to_table_break_tmp_to_target( def trino_move_data_to_table_break_tmp_to_target_and_restore( obj, from_db: str, from_table_name: str, to_db: str, to_table_name: str ): - if from_table_name == "tmp_dw_scmdb_people" and to_table_name == "dw_scmdb_people": + if from_table_name.startswith("tmp_dw_people") and to_table_name.startswith( + "dw_people" + ): obj.drop_table(from_db, from_table_name) - if ( - from_table_name == "backup_dw_scmdb_people" - and to_table_name == "dw_scmdb_people" + + if from_table_name.startswith("backup_dw_people") and to_table_name.startswith( + "dw_people" ): obj.drop_table(from_db, from_table_name) + return org_move_data_to_table(obj, from_db, from_table_name, to_db, to_table_name) +@pytest.fixture(autouse=True) +def mock_os_environ(): + with mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "TRINO", + VDK_TRINO_PORT: "8080", + VDK_TRINO_USE_SSL: "False", + VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY: "INSERT_SELECT", + }, + ): + yield + + @pytest.mark.usefixtures("trino_service") -@mock.patch.dict( - os.environ, - { - VDK_DB_DEFAULT_TYPE: "TRINO", - VDK_TRINO_PORT: "8080", - VDK_TRINO_USE_SSL: "False", - VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY: "INSERT_SELECT", - }, -) -class TemplateRegressionTests(unittest.TestCase): +class TestTemplates(unittest.TestCase): def setUp(self) -> None: self.__runner = CliEntryBasedTestRunner(trino_plugin) + self.__schema = f"source_{uuid.uuid4().hex[:10]}" + self.__trino_query("create schema " + self.__schema) def test_scd1_template(self) -> None: - source_schema = "default" + source_schema = self.__schema source_view = "vw_dim_org" - target_schema = "default" + target_schema = self.__schema target_table = "dw_dim_org" result: Result = self.__runner.invoke( @@ -86,6 +99,7 @@ def test_scd1_template(self) -> None: cli_assert_equal(0, result) + # refactor below to use vdk-trino-query actual_rs: Result = self.__runner.invoke( [ "trino-query", @@ -112,10 +126,17 @@ def test_scd1_template(self) -> None: actual_rs.output == expected_rs.output ), f"Elements in {source_view} and {target_table} differ." + def test_scd1_template_using_rename_strategy(self) -> None: + with mock.patch.dict( + os.environ, + {VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY: "RENAME"}, + ): + self.test_scd1_template() + def test_scd1_template_reserved_args(self) -> None: - source_schema = "default" + source_schema = self.__schema source_view = "alter" - target_schema = "default" + target_schema = self.__schema target_table = "table" result: Result = self.__runner.invoke( @@ -166,10 +187,10 @@ def test_scd1_template_reserved_args(self) -> None: ), f"Elements in {source_view} and {target_table} differ." def test_scd2_template(self) -> None: - test_schema = "default" - source_view = "vw_scmdb_people" - target_table = "dw_scmdb_people" - expect_table = "ex_scmdb_people" + test_schema = self.__schema + source_view = "vw_people_scd2" + target_table = "dw_people_scd2" + expect_table = "ex_people_scd2" result: Result = self.__scd2_template_execute( test_schema, source_view, target_table, expect_table @@ -182,8 +203,15 @@ def test_scd2_template(self) -> None: 1, self.__template_table_exists(test_schema, "backup_" + target_table) ) + def test_scd2_template_using_rename_strategy(self) -> None: + with mock.patch.dict( + os.environ, + {VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY: "RENAME"}, + ): + self.test_scd2_template() + def test_scd2_template_reserved_args(self) -> None: - test_schema = "default" + test_schema = self.__schema source_view = "alter" target_table = "table" expect_table = "between" @@ -202,10 +230,10 @@ def test_scd2_template_reserved_args(self) -> None: ) def test_scd2_template_restore_target_from_backup_on_start(self) -> None: - test_schema = "default" - source_view = "vw_scmdb_people" - target_table = "dw_scmdb_people" - expect_table = "ex_scmdb_people" + test_schema = self.__schema + source_view = "vw_people_scd2_restore" + target_table = "dw_people_scd2_restore" + expect_table = "ex_people_scd2_restore" result: Result = self.__scd2_template_execute( test_schema, source_view, target_table, expect_table, True @@ -224,10 +252,10 @@ def test_scd2_template_restore_target_from_backup_on_start(self) -> None: new=trino_move_data_to_table_break_tmp_to_target, ) def test_scd2_template_fail_last_step_and_restore_target(self): - test_schema = "default" - source_view = "vw_scmdb_people" - target_table = "dw_scmdb_people" - expect_table = "ex_scmdb_people" + test_schema = self.__schema + source_view = "vw_people" + target_table = "dw_people_scd2_fail_restore" + expect_table = "ex_people_scd2_fail_restore" result: Result = self.__scd2_template_execute( test_schema, source_view, target_table, expect_table @@ -243,10 +271,10 @@ def test_scd2_template_fail_last_step_and_restore_target(self): new=trino_move_data_to_table_break_tmp_to_target_and_restore, ) def test_scd2_template_fail_last_step_and_fail_restore_target(self): - test_schema = "default" - source_view = "vw_scmdb_people" - target_table = "dw_scmdb_people" - expect_table = "ex_scmdb_people" + test_schema = self.__schema + source_view = "vw_people_scd2_fail_fail_restore" + target_table = "dw_people_scd2_fail_fail_restore" + expect_table = "ex_people_scd2_fail_fail_restore" result: Result = self.__scd2_template_execute( test_schema, source_view, target_table, expect_table @@ -261,7 +289,7 @@ def test_scd2_template_fail_last_step_and_fail_restore_target(self): ), "Missing log for losing target schema." def test_fact_periodic_snapshot_template(self) -> None: - test_schema = "default" + test_schema = self.__schema source_view = "vw_fact_sddc_daily" target_table = "dw_fact_sddc_daily" expect_table = "ex_fact_sddc_daily" @@ -275,8 +303,15 @@ def test_fact_periodic_snapshot_template(self) -> None: test_schema, target_table, expect_table ) + def test_fact_periodic_snapshot_template_using_rename_strategy(self) -> None: + with mock.patch.dict( + os.environ, + {VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY: "RENAME"}, + ): + self.test_fact_periodic_snapshot_template() + def test_fact_periodic_snapshot_template_reserved_args(self) -> None: - test_schema = "default" + test_schema = self.__schema source_view = "alter" target_table = "table" expect_table = "between" @@ -291,10 +326,10 @@ def test_fact_periodic_snapshot_template_reserved_args(self) -> None: ) def test_fact_periodic_snapshot_empty_source(self) -> None: - test_schema = "default" - source_view = "vw_fact_sddc_daily" - target_table = "dw_fact_sddc_daily" - expect_table = "ex_fact_sddc_daily" + test_schema = self.__schema + source_view = "vw_fact_sddc_daily_empty_source" + target_table = "dw_fact_sddc_daily_empty_source" + expect_table = "ex_fact_sddc_daily_empty_source" result: Result = self.__runner.invoke( [ @@ -331,10 +366,10 @@ def test_fact_periodic_snapshot_empty_source(self) -> None: def test_fact_periodic_snapshot_template_restore_target_from_backup_on_start( self, ) -> None: - test_schema = "default" - source_view = "vw_scmdb_people" - target_table = "dw_scmdb_people" - expect_table = "ex_scmdb_people" + test_schema = self.__schema + source_view = "vw_people_fact_restore" + target_table = "dw_people_fact_restore" + expect_table = "ex_people_fact_restore" result: Result = self.__fact_periodic_snapshot_template_execute( test_schema, source_view, target_table, expect_table, True @@ -355,10 +390,10 @@ def test_fact_periodic_snapshot_template_restore_target_from_backup_on_start( new=trino_move_data_to_table_break_tmp_to_target, ) def test_fact_periodic_snapshot_template_fail_last_step_and_restore_target(self): - test_schema = "default" - source_view = "vw_scmdb_people" - target_table = "dw_scmdb_people" - expect_table = "ex_scmdb_people" + test_schema = self.__schema + source_view = "vw_people_fact_fail_restore" + target_table = "dw_people_fact_fail_restore" + expect_table = "ex_people_fact_fail_restore" result: Result = self.__fact_periodic_snapshot_template_execute( test_schema, source_view, target_table, expect_table @@ -376,10 +411,10 @@ def test_fact_periodic_snapshot_template_fail_last_step_and_restore_target(self) def test_fact_periodic_snapshot_template_fail_last_step_and_fail_restore_target( self, ): - test_schema = "default" - source_view = "vw_scmdb_people" - target_table = "dw_scmdb_people" - expect_table = "ex_scmdb_people" + test_schema = self.__schema + source_view = "vw_people_fact_fail_fail_restore" + target_table = "dw_people_fact_fail_fail_restore" + expect_table = "ex_people_fact_fail_fail_restore" result: Result = self.__fact_periodic_snapshot_template_execute( test_schema, source_view, target_table, expect_table @@ -570,15 +605,11 @@ def __template_table_exists(self, schema_name, target_name) -> Result: ] ) - -@mock.patch.dict( - os.environ, - { - VDK_DB_DEFAULT_TYPE: "TRINO", - VDK_TRINO_PORT: "8080", - VDK_TRINO_USE_SSL: "False", - VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY: "RENAME", - }, -) -class TemplateRegressionTestsRenameStrategy(TemplateRegressionTests): - pass + def __trino_query(self, query: str) -> Result: + return self.__runner.invoke( + [ + "trino-query", + "--query", + query, + ] + )