diff --git a/projects/vdk-core/plugins/vdk-greenplum/.plugin-ci.yml b/projects/vdk-core/plugins/vdk-greenplum/.plugin-ci.yml new file mode 100644 index 0000000000..01e8e4857b --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/.plugin-ci.yml @@ -0,0 +1,35 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 + +image: "python:3.7" + +.build-vdk-greenplum: + image: docker:19.03.8 + services: + - docker:19.03.8-dind + variables: + DOCKER_HOST: tcp://localhost:2375 + DOCKER_DRIVER: overlay2 + DOCKER_TLS_CERTDIR: "" + PLUGIN_NAME: vdk-greenplum + extends: .build-plugin + +build-py37-vdk-greenplum: + extends: .build-vdk-greenplum + image: "python:3.7" + + +build-py38-vdk-greenplum: + extends: .build-vdk-greenplum + image: "python:3.8" + + +build-py39-vdk-greenplum: + extends: .build-vdk-greenplum + image: "python:3.9" + + +release-vdk-greenplum: + variables: + PLUGIN_NAME: vdk-greenplum + extends: .release-plugin diff --git a/projects/vdk-core/plugins/vdk-greenplum/README.md b/projects/vdk-core/plugins/vdk-greenplum/README.md new file mode 100644 index 0000000000..06c2ecc330 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/README.md @@ -0,0 +1,33 @@ +This plugin allows vdk-core to interface with and execute queries against a Greenplum database. + +# Usage + +Run +```bash +pip install vdk-greenplum +``` + +After this, data jobs will have access to a Greenplum database connection, managed by Versatile Data Kit SDK. + +If it is the only database plugin installed , vdk would automatically use it. +Otherwise, users need to set VDK_DB_DEFAULT_TYPE=GREENPLUM as an environment variable or set 'db_default_type' option in the data job config file (config.ini). + +For example + +```python + def run(job_input: IJobInput): + job_input.execute_query("select 'Hi Greenplum!'") +``` + +# Configuration + +Run vdk config-help - search for those prefixed with "GREENPLUM_" to see what configuration options are available. + +# Testing + +Testing this plugin locally requires installing the dependencies listed in plugins/vdk-greenplum/requirements.txt + +Run +```bash +pip install -r requirements.txt +``` diff --git a/projects/vdk-core/plugins/vdk-greenplum/requirements.txt b/projects/vdk-core/plugins/vdk-greenplum/requirements.txt new file mode 100644 index 0000000000..fc60ac0846 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/requirements.txt @@ -0,0 +1,9 @@ +vdk-core +psycopg2-binary +tabulate + +# testing requirements +click +vdk-test-utils +pytest-docker +docker-compose diff --git a/projects/vdk-core/plugins/vdk-greenplum/setup.py b/projects/vdk-core/plugins/vdk-greenplum/setup.py new file mode 100644 index 0000000000..e6d6c3fa50 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/setup.py @@ -0,0 +1,31 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import pathlib + +import setuptools + + +__version__ = "0.0.1" + +setuptools.setup( + name="vdk-greenplum", + version=__version__, + url="https://github.com/vmware/versatile-data-kit", + description="Versatile Data Kit SDK plugin provides support for Greenplum database and greenplum transformation templates.", + long_description=pathlib.Path("README.md").read_text(), + long_description_content_type="text/markdown", + install_requires=["vdk-core", "psycopg2-binary"], + package_dir={"": "src"}, + packages=setuptools.find_namespace_packages(where="src"), + include_package_data=True, + entry_points={ + "vdk.plugin.run": ["vdk-greenplum = vdk.plugin.greenplum.greenplum_plugin"] + }, + classifiers=[ + "Development Status :: 4 - Beta", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + ], +) diff --git a/projects/vdk-core/plugins/vdk-greenplum/src/vdk/plugin/greenplum/greenplum_connection.py b/projects/vdk-core/plugins/vdk-greenplum/src/vdk/plugin/greenplum/greenplum_connection.py new file mode 100644 index 0000000000..bab76b2970 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/src/vdk/plugin/greenplum/greenplum_connection.py @@ -0,0 +1,114 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging +from typing import Any +from typing import List +from typing import Optional + +from vdk.internal.builtin_plugins.connection.managed_connection_base import ( + ManagedConnectionBase, +) + +_log = logging.getLogger(__name__) + + +class GreenplumConnection(ManagedConnectionBase): + def __init__( + self, + dsn: str = None, + connection_factory=None, + cursor_factory=None, + dbname: str = None, + # database - deprecated alias + user: str = None, + password: str = None, + host: str = None, + port: int = None, + **kwargs, + ): + r""" + See https://www.psycopg.org/docs/module.html + Create a new database connection. + + The connection parameters can be specified as a string: + + conn = psycopg2.connect("dbname=test user=greenplum password=secret") + + or using a set of keyword arguments: + + conn = psycopg2.connect(database="test", user="greenplum", password="secret") + + Or as a mix of both. The basic connection parameters are: + + :param dsn: Optional[str] + libpq connection string, + https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING + :param connection_factory: + Using the *connection_factory* parameter a different class or connections + factory can be specified. It should be a callable object taking a dsn + argument. + :param cursor_factory: + Using the *cursor_factory* parameter, a new default cursor factory will be + used by cursor(). + :param dbname: Optional[str] + The database name + :param database: Optional[str] + The database name (only as keyword argument) + :param user: Optional[str] + User name used to authenticate + :param password: Optional[str] + Password used to authenticate + :param host: Optional[str] + Database host address (defaults to UNIX socket if not provided) + :param port: Optional[int] + Connection port number (defaults to 5432 if not provided) + :param \**kwargs: + Any other keyword parameter will be passed to the underlying client + library: the list of supported parameters depends on the library version. + See bellow + + :Keyword Arguments: + * *async* (boolean) -- + Using *async*=True an asynchronous connection will be created. *async_* is + a valid alias (for Python versions where ``async`` is a keyword). + """ + super().__init__(_log) + + self._dsn = dsn + self._connection_factory = connection_factory + self._cursor_factory = cursor_factory + self._dbname = dbname + self._user = user + self._password = password + self._host = host + self._port = port + self._kwargs = kwargs + + dsn_message_optional = "" + if self._dsn: + dsn_message_optional = f"dsn: {dsn}, " + _log.debug( + f"Creating new Greenplum connection for {dsn_message_optional}" + f"user: {user} to [host:port/dbname]: {host}:{port}/{dbname}" + ) + + def _connect(self): + import psycopg2 + + return psycopg2.connect( + dsn=self._dsn, + connection_factory=self._connection_factory, + cursor_factory=self._cursor_factory, + dbname=self._dbname, + user=self._user, + password=self._password, + host=self._host, + port=self._port, + **self._kwargs, + ) + + def execute_query(self, query: str) -> List[List[Any]]: + try: + return super().execute_query(query) + finally: + self.commit() diff --git a/projects/vdk-core/plugins/vdk-greenplum/src/vdk/plugin/greenplum/greenplum_plugin.py b/projects/vdk-core/plugins/vdk-greenplum/src/vdk/plugin/greenplum/greenplum_plugin.py new file mode 100644 index 0000000000..bbe9df045f --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/src/vdk/plugin/greenplum/greenplum_plugin.py @@ -0,0 +1,77 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import click +from tabulate import tabulate +from vdk.api.plugin.hook_markers import hookimpl +from vdk.internal.builtin_plugins.run.job_context import JobContext +from vdk.internal.core.config import Configuration +from vdk.internal.core.config import ConfigurationBuilder +from vdk.plugin.greenplum.greenplum_connection import GreenplumConnection + + +def _connection_by_configuration(configuration: Configuration): + return GreenplumConnection( + dsn=configuration.get_value("GREENPLUM_DSN"), + dbname=configuration.get_value("GREENPLUM_DBNAME"), + user=configuration.get_value("GREENPLUM_USER"), + password=configuration.get_value("GREENPLUM_PASSWORD"), + host=configuration.get_value("GREENPLUM_HOST"), + port=configuration.get_value("GREENPLUM_PORT"), + ) + + +@hookimpl +def vdk_configure(config_builder: ConfigurationBuilder) -> None: + """ + Here we define what configuration settings are needed for Greenplum with reasonable defaults + """ + config_builder.add( + key="GREENPLUM_DSN", + default_value=None, + description="libpq connection string, " + "https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING", + ) + config_builder.add( + key="GREENPLUM_DBNAME", default_value=None, description="Database name" + ) + config_builder.add( + key="GREENPLUM_USER", default_value=None, description="User name" + ) + config_builder.add( + key="GREENPLUM_PASSWORD", default_value=None, description="User password" + ) + config_builder.add( + key="GREENPLUM_HOST", + default_value=None, + description="The host we need to connect to, defaulting to " + "UNIX socket, https://www.psycopg.org/docs/module.html", + ) + config_builder.add( + key="GREENPLUM_PORT", + default_value=None, + description="The port to connect to, defaulting to 5432", + ) + + +@hookimpl +def initialize_job(context: JobContext) -> None: + context.connections.add_open_connection_factory_method( + "GREENPLUM", + lambda: _connection_by_configuration(context.core_context.configuration), + ) + + +@hookimpl +def vdk_command_line(root_command: click.Group): + root_command.add_command(greenplum_query) + + +@click.command(name="greenplum-query", help="executes SQL query against Greenplum") +@click.option("-q", "--query", type=click.STRING, required=True) +@click.pass_context +def greenplum_query(ctx: click.Context, query): + click.echo( + tabulate( + _connection_by_configuration(ctx.obj.configuration).execute_query(query) + ) + ) diff --git a/projects/vdk-core/plugins/vdk-greenplum/tests/conftest.py b/projects/vdk-core/plugins/vdk-greenplum/tests/conftest.py new file mode 100644 index 0000000000..39e220937e --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/tests/conftest.py @@ -0,0 +1,60 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import os +import time +from unittest import mock + +import pytest +from vdk.plugin.greenplum import greenplum_plugin +from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner + +VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE" +VDK_GREENPLUM_DBNAME = "VDK_GREENPLUM_DBNAME" +VDK_GREENPLUM_USER = "VDK_GREENPLUM_USER" +VDK_GREENPLUM_PASSWORD = "VDK_GREENPLUM_PASSWORD" +VDK_GREENPLUM_HOST = "VDK_GREENPLUM_HOST" +VDK_GREENPLUM_PORT = "VDK_GREENPLUM_PORT" + + +def _is_responsive(runner): + try: + result = runner.invoke(["greenplum-query", "--query", "SELECT 1"]) + if result.exit_code == 0: + return True + except: + return False + + +@pytest.fixture(scope="session") +def docker_compose_file(pytestconfig): + return os.path.join( + os.path.dirname(os.path.abspath(__file__)), "docker-compose.yml" + ) + + +@pytest.fixture(scope="session") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "GREENPLUM", + VDK_GREENPLUM_DBNAME: "postgres", + VDK_GREENPLUM_USER: "gpadmin", + VDK_GREENPLUM_PASSWORD: "pivotal", + VDK_GREENPLUM_HOST: "localhost", + VDK_GREENPLUM_PORT: "5432", + }, +) +def greenplum_service(docker_ip, docker_services): + """Ensure that Greenplum service is up and responsive.""" + runner = CliEntryBasedTestRunner(greenplum_plugin) + + # give the server some time to start before checking if it is ready + # before adding this sleep there were intermittent fails of the CI/CD with error: + # requests.exceptions.ConnectionError: + # ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer')) + # More info: https://stackoverflow.com/questions/383738/104-connection-reset-by-peer-socket-error-or-when-does-closing-a-socket-resu + time.sleep(3) + + docker_services.wait_until_responsive( + timeout=30.0, pause=0.3, check=lambda: _is_responsive(runner) + ) diff --git a/projects/vdk-core/plugins/vdk-greenplum/tests/docker-compose.yml b/projects/vdk-core/plugins/vdk-greenplum/tests/docker-compose.yml new file mode 100644 index 0000000000..e034cb1b16 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/tests/docker-compose.yml @@ -0,0 +1,11 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 + +services: + greenplum: + image: "datagrip/greenplum" + ports: + - "5432:5432" + environment: + GREENPLUM_USER: gpadmin + GREENPLUM_PASSWORD: pivotal diff --git a/projects/vdk-core/plugins/vdk-greenplum/tests/jobs/sql-job/10_create_table.sql b/projects/vdk-core/plugins/vdk-greenplum/tests/jobs/sql-job/10_create_table.sql new file mode 100644 index 0000000000..73339fcc9a --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/tests/jobs/sql-job/10_create_table.sql @@ -0,0 +1 @@ +CREATE TABLE stocks (date text, symbol text, price real) diff --git a/projects/vdk-core/plugins/vdk-greenplum/tests/jobs/sql-job/20_populate_table.sql b/projects/vdk-core/plugins/vdk-greenplum/tests/jobs/sql-job/20_populate_table.sql new file mode 100644 index 0000000000..bbe7257c58 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/tests/jobs/sql-job/20_populate_table.sql @@ -0,0 +1 @@ +INSERT INTO stocks VALUES ('2020-01-01', 'GOOG', 123.0), ('2020-01-01', 'GOOG', 123.0) diff --git a/projects/vdk-core/plugins/vdk-greenplum/tests/test_vdk_greenplum_utils.py b/projects/vdk-core/plugins/vdk-greenplum/tests/test_vdk_greenplum_utils.py new file mode 100644 index 0000000000..b20fe15c88 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-greenplum/tests/test_vdk_greenplum_utils.py @@ -0,0 +1,48 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import os +import unittest +from unittest import mock + +import pytest +from click.testing import Result +from vdk.plugin.greenplum import greenplum_plugin +from vdk.plugin.test_utils.util_funcs import cli_assert_equal +from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner +from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory + +VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE" +VDK_GREENPLUM_DBNAME = "VDK_GREENPLUM_DBNAME" +VDK_GREENPLUM_USER = "VDK_GREENPLUM_USER" +VDK_GREENPLUM_PASSWORD = "VDK_GREENPLUM_PASSWORD" +VDK_GREENPLUM_HOST = "VDK_GREENPLUM_HOST" +VDK_GREENPLUM_PORT = "VDK_GREENPLUM_PORT" + + +@pytest.mark.usefixtures("greenplum_service") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "GREENPLUM", + VDK_GREENPLUM_DBNAME: "postgres", + VDK_GREENPLUM_USER: "gpadmin", + VDK_GREENPLUM_PASSWORD: "pivotal", + VDK_GREENPLUM_HOST: "localhost", + VDK_GREENPLUM_PORT: "5432", + }, +) +class GreenplumUtilsTests(unittest.TestCase): + def setUp(self) -> None: + self.__runner = CliEntryBasedTestRunner(greenplum_plugin) + + def test_execute_query(self) -> None: + result: Result = self.__runner.invoke( + ["run", jobs_path_from_caller_directory("sql-job")] + ) + cli_assert_equal(0, result) + + actual_rs: Result = self.__runner.invoke( + ["greenplum-query", "--query", "SELECT * FROM stocks"] + ) + cli_assert_equal(0, actual_rs) + assert "GOOG" in actual_rs.output