-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
vdk-core: Greenplum support #361
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# Copyright 2021 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
image: "python:3.7" | ||
|
||
.build-vdk-greenplum: | ||
image: docker:19.03.8 | ||
services: | ||
- docker:19.03.8-dind | ||
variables: | ||
DOCKER_HOST: tcp://localhost:2375 | ||
DOCKER_DRIVER: overlay2 | ||
DOCKER_TLS_CERTDIR: "" | ||
PLUGIN_NAME: vdk-greenplum | ||
extends: .build-plugin | ||
|
||
build-py37-vdk-greenplum: | ||
extends: .build-vdk-greenplum | ||
image: "python:3.7" | ||
|
||
|
||
build-py38-vdk-greenplum: | ||
extends: .build-vdk-greenplum | ||
image: "python:3.8" | ||
|
||
|
||
build-py39-vdk-greenplum: | ||
extends: .build-vdk-greenplum | ||
image: "python:3.9" | ||
|
||
|
||
release-vdk-greenplum: | ||
variables: | ||
PLUGIN_NAME: vdk-greenplum | ||
extends: .release-plugin |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
This plugin allows vdk-core to interface with and execute queries against a Greenplum database. | ||
|
||
# Usage | ||
|
||
Run | ||
```bash | ||
pip install vdk-greenplum | ||
``` | ||
|
||
After this, data jobs will have access to a Greenplum database connection, managed by Versatile Data Kit SDK. | ||
|
||
If it is the only database plugin installed , vdk would automatically use it. | ||
Otherwise, users need to set VDK_DB_DEFAULT_TYPE=GREENPLUM as an environment variable or set 'db_default_type' option in the data job config file (config.ini). | ||
|
||
For example | ||
|
||
```python | ||
def run(job_input: IJobInput): | ||
job_input.execute_query("select 'Hi Greenplum!'") | ||
``` | ||
|
||
# Configuration | ||
|
||
Run vdk config-help - search for those prefixed with "GREENPLUM_" to see what configuration options are available. | ||
|
||
# Testing | ||
|
||
Testing this plugin locally requires installing the dependencies listed in plugins/vdk-greenplum/requirements.txt | ||
|
||
Run | ||
```bash | ||
pip install -r requirements.txt | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
vdk-core | ||
psycopg2-binary | ||
tabulate | ||
|
||
# testing requirements | ||
click | ||
vdk-test-utils | ||
pytest-docker | ||
docker-compose |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,31 @@ | ||||||
# Copyright 2021 VMware, Inc. | ||||||
# SPDX-License-Identifier: Apache-2.0 | ||||||
import pathlib | ||||||
|
||||||
import setuptools | ||||||
|
||||||
|
||||||
__version__ = "0.0.1" | ||||||
|
||||||
setuptools.setup( | ||||||
name="vdk-greenplum", | ||||||
version=__version__, | ||||||
url="https://github.com/vmware/versatile-data-kit", | ||||||
description="Versatile Data Kit SDK plugin provides support for Greenplum database and greenplum transformation templates.", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit
Suggested change
|
||||||
long_description=pathlib.Path("README.md").read_text(), | ||||||
long_description_content_type="text/markdown", | ||||||
install_requires=["vdk-core", "psycopg2-binary"], | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't tabulate also be included here? |
||||||
package_dir={"": "src"}, | ||||||
packages=setuptools.find_namespace_packages(where="src"), | ||||||
include_package_data=True, | ||||||
entry_points={ | ||||||
"vdk.plugin.run": ["vdk-greenplum = vdk.plugin.greenplum.greenplum_plugin"] | ||||||
}, | ||||||
classifiers=[ | ||||||
"Development Status :: 4 - Beta", | ||||||
"License :: OSI Approved :: Apache Software License", | ||||||
"Programming Language :: Python :: 3.7", | ||||||
"Programming Language :: Python :: 3.8", | ||||||
"Programming Language :: Python :: 3.9", | ||||||
], | ||||||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
# Copyright 2021 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import logging | ||
from typing import Any | ||
from typing import List | ||
from typing import Optional | ||
|
||
from vdk.internal.builtin_plugins.connection.managed_connection_base import ( | ||
ManagedConnectionBase, | ||
) | ||
|
||
_log = logging.getLogger(__name__) | ||
|
||
|
||
class GreenplumConnection(ManagedConnectionBase): | ||
def __init__( | ||
self, | ||
dsn: str = None, | ||
connection_factory=None, | ||
cursor_factory=None, | ||
dbname: str = None, | ||
# database - deprecated alias | ||
user: str = None, | ||
password: str = None, | ||
host: str = None, | ||
port: int = None, | ||
**kwargs, | ||
): | ||
r""" | ||
See https://www.psycopg.org/docs/module.html | ||
Create a new database connection. | ||
|
||
The connection parameters can be specified as a string: | ||
|
||
conn = psycopg2.connect("dbname=test user=greenplum password=secret") | ||
|
||
or using a set of keyword arguments: | ||
|
||
conn = psycopg2.connect(database="test", user="greenplum", password="secret") | ||
|
||
Or as a mix of both. The basic connection parameters are: | ||
|
||
:param dsn: Optional[str] | ||
libpq connection string, | ||
https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING | ||
:param connection_factory: | ||
Using the *connection_factory* parameter a different class or connections | ||
factory can be specified. It should be a callable object taking a dsn | ||
argument. | ||
:param cursor_factory: | ||
Using the *cursor_factory* parameter, a new default cursor factory will be | ||
used by cursor(). | ||
:param dbname: Optional[str] | ||
The database name | ||
:param database: Optional[str] | ||
The database name (only as keyword argument) | ||
:param user: Optional[str] | ||
User name used to authenticate | ||
:param password: Optional[str] | ||
Password used to authenticate | ||
:param host: Optional[str] | ||
Database host address (defaults to UNIX socket if not provided) | ||
:param port: Optional[int] | ||
Connection port number (defaults to 5432 if not provided) | ||
:param \**kwargs: | ||
Any other keyword parameter will be passed to the underlying client | ||
library: the list of supported parameters depends on the library version. | ||
See bellow | ||
|
||
:Keyword Arguments: | ||
* *async* (boolean) -- | ||
Using *async*=True an asynchronous connection will be created. *async_* is | ||
a valid alias (for Python versions where ``async`` is a keyword). | ||
""" | ||
super().__init__(_log) | ||
|
||
self._dsn = dsn | ||
self._connection_factory = connection_factory | ||
self._cursor_factory = cursor_factory | ||
self._dbname = dbname | ||
self._user = user | ||
self._password = password | ||
self._host = host | ||
self._port = port | ||
self._kwargs = kwargs | ||
|
||
dsn_message_optional = "" | ||
if self._dsn: | ||
dsn_message_optional = f"dsn: {dsn}, " | ||
_log.debug( | ||
f"Creating new Greenplum connection for {dsn_message_optional}" | ||
f"user: {user} to [host:port/dbname]: {host}:{port}/{dbname}" | ||
) | ||
|
||
def _connect(self): | ||
import psycopg2 | ||
|
||
return psycopg2.connect( | ||
dsn=self._dsn, | ||
connection_factory=self._connection_factory, | ||
cursor_factory=self._cursor_factory, | ||
dbname=self._dbname, | ||
user=self._user, | ||
password=self._password, | ||
host=self._host, | ||
port=self._port, | ||
**self._kwargs, | ||
) | ||
|
||
def execute_query(self, query: str) -> List[List[Any]]: | ||
try: | ||
return super().execute_query(query) | ||
finally: | ||
self.commit() |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,77 @@ | ||||||
# Copyright 2021 VMware, Inc. | ||||||
# SPDX-License-Identifier: Apache-2.0 | ||||||
import click | ||||||
from tabulate import tabulate | ||||||
from vdk.api.plugin.hook_markers import hookimpl | ||||||
from vdk.internal.builtin_plugins.run.job_context import JobContext | ||||||
from vdk.internal.core.config import Configuration | ||||||
from vdk.internal.core.config import ConfigurationBuilder | ||||||
from vdk.plugin.greenplum.greenplum_connection import GreenplumConnection | ||||||
|
||||||
|
||||||
def _connection_by_configuration(configuration: Configuration): | ||||||
return GreenplumConnection( | ||||||
dsn=configuration.get_value("GREENPLUM_DSN"), | ||||||
dbname=configuration.get_value("GREENPLUM_DBNAME"), | ||||||
user=configuration.get_value("GREENPLUM_USER"), | ||||||
password=configuration.get_value("GREENPLUM_PASSWORD"), | ||||||
host=configuration.get_value("GREENPLUM_HOST"), | ||||||
port=configuration.get_value("GREENPLUM_PORT"), | ||||||
) | ||||||
|
||||||
|
||||||
@hookimpl | ||||||
def vdk_configure(config_builder: ConfigurationBuilder) -> None: | ||||||
""" | ||||||
Here we define what configuration settings are needed for Greenplum with reasonable defaults | ||||||
""" | ||||||
config_builder.add( | ||||||
key="GREENPLUM_DSN", | ||||||
default_value=None, | ||||||
description="libpq connection string, " | ||||||
"https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING", | ||||||
) | ||||||
config_builder.add( | ||||||
key="GREENPLUM_DBNAME", default_value=None, description="Database name" | ||||||
) | ||||||
config_builder.add( | ||||||
key="GREENPLUM_USER", default_value=None, description="User name" | ||||||
) | ||||||
config_builder.add( | ||||||
key="GREENPLUM_PASSWORD", default_value=None, description="User password" | ||||||
) | ||||||
config_builder.add( | ||||||
key="GREENPLUM_HOST", | ||||||
default_value=None, | ||||||
description="The host we need to connect to, defaulting to " | ||||||
"UNIX socket, https://www.psycopg.org/docs/module.html", | ||||||
) | ||||||
config_builder.add( | ||||||
key="GREENPLUM_PORT", | ||||||
default_value=None, | ||||||
description="The port to connect to, defaulting to 5432", | ||||||
) | ||||||
|
||||||
|
||||||
@hookimpl | ||||||
def initialize_job(context: JobContext) -> None: | ||||||
context.connections.add_open_connection_factory_method( | ||||||
"GREENPLUM", | ||||||
lambda: _connection_by_configuration(context.core_context.configuration), | ||||||
) | ||||||
|
||||||
|
||||||
@hookimpl | ||||||
def vdk_command_line(root_command: click.Group): | ||||||
root_command.add_command(greenplum_query) | ||||||
|
||||||
|
||||||
@click.command(name="greenplum-query", help="executes SQL query against Greenplum") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
@click.option("-q", "--query", type=click.STRING, required=True) | ||||||
@click.pass_context | ||||||
def greenplum_query(ctx: click.Context, query): | ||||||
click.echo( | ||||||
tabulate( | ||||||
_connection_by_configuration(ctx.obj.configuration).execute_query(query) | ||||||
) | ||||||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# Copyright 2021 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import os | ||
import time | ||
from unittest import mock | ||
|
||
import pytest | ||
from vdk.plugin.greenplum import greenplum_plugin | ||
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner | ||
|
||
VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE" | ||
VDK_GREENPLUM_DBNAME = "VDK_GREENPLUM_DBNAME" | ||
VDK_GREENPLUM_USER = "VDK_GREENPLUM_USER" | ||
VDK_GREENPLUM_PASSWORD = "VDK_GREENPLUM_PASSWORD" | ||
VDK_GREENPLUM_HOST = "VDK_GREENPLUM_HOST" | ||
VDK_GREENPLUM_PORT = "VDK_GREENPLUM_PORT" | ||
|
||
|
||
def _is_responsive(runner): | ||
try: | ||
result = runner.invoke(["greenplum-query", "--query", "SELECT 1"]) | ||
if result.exit_code == 0: | ||
return True | ||
except: | ||
return False | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def docker_compose_file(pytestconfig): | ||
return os.path.join( | ||
os.path.dirname(os.path.abspath(__file__)), "docker-compose.yml" | ||
) | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
@mock.patch.dict( | ||
os.environ, | ||
{ | ||
VDK_DB_DEFAULT_TYPE: "GREENPLUM", | ||
VDK_GREENPLUM_DBNAME: "postgres", | ||
VDK_GREENPLUM_USER: "gpadmin", | ||
VDK_GREENPLUM_PASSWORD: "pivotal", | ||
VDK_GREENPLUM_HOST: "localhost", | ||
VDK_GREENPLUM_PORT: "5432", | ||
}, | ||
) | ||
def greenplum_service(docker_ip, docker_services): | ||
"""Ensure that Greenplum service is up and responsive.""" | ||
runner = CliEntryBasedTestRunner(greenplum_plugin) | ||
|
||
# give the server some time to start before checking if it is ready | ||
# before adding this sleep there were intermittent fails of the CI/CD with error: | ||
# requests.exceptions.ConnectionError: | ||
# ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer')) | ||
# More info: https://stackoverflow.com/questions/383738/104-connection-reset-by-peer-socket-error-or-when-does-closing-a-socket-resu | ||
time.sleep(3) | ||
|
||
docker_services.wait_until_responsive( | ||
timeout=30.0, pause=0.3, check=lambda: _is_responsive(runner) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# Copyright 2021 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
services: | ||
greenplum: | ||
image: "datagrip/greenplum" | ||
ports: | ||
- "5432:5432" | ||
environment: | ||
GREENPLUM_USER: gpadmin | ||
GREENPLUM_PASSWORD: pivotal |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
CREATE TABLE stocks (date text, symbol text, price real) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
INSERT INTO stocks VALUES ('2020-01-01', 'GOOG', 123.0), ('2020-01-01', 'GOOG', 123.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me think whether we should extend config-help to allow config variable sorting. If one has 15 or 20 plugins installed, it might be very annoying to have to scroll through them to find the one you're looking for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is obviously unrelated to this PR.