Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-core: Greenplum ingestion #415

Merged
merged 4 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions projects/vdk-core/plugins/vdk-greenplum/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ For example
job_input.execute_query("select 'Hi Greenplum!'")
```

## Ingestion

This plugin allows users to [ingest](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-core/src/vdk/api/job_input.py#L90) data to a Greenplum database,
which can be preferable to inserting data manually as it automatically handles serializing, packaging and sending of the data asynchronously with configurable batching and throughput.
To do so, you must set the expected variables to connect to Greenplum, plus the following environment variable:
```sh
export VDK_INGEST_METHOD_DEFAULT=GREENPLUM
```

Then, from inside the run function in a Python step, you can use the `send_object_for_ingestion` or `send_tabular_data_for_ingestion` methods to ingest your data.

# Configuration

Run vdk config-help - search for those prefixed with "GREENPLUM_" to see what configuration options are available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from vdk.internal.core.config import Configuration
from vdk.internal.core.config import ConfigurationBuilder
from vdk.plugin.greenplum.greenplum_connection import GreenplumConnection
from vdk.plugin.greenplum.ingest_to_greenplum import IngestToGreenplum


def _connection_by_configuration(configuration: Configuration):
Expand Down Expand Up @@ -59,6 +60,9 @@ def initialize_job(context: JobContext) -> None:
"GREENPLUM",
lambda: _connection_by_configuration(context.core_context.configuration),
)
context.ingester.add_ingester_factory_method(
"GREENPLUM", lambda: IngestToGreenplum(context)
)


@hookimpl
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import List
from typing import Optional
from typing import Tuple

from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Cursor
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core import errors
from vdk.plugin.greenplum.greenplum_connection import GreenplumConnection

_log = logging.getLogger(__name__)


class IngestToGreenplum(IIngesterPlugin):
"""
Create a new ingestion mechanism for ingesting to a Greenplum database
"""

def __init__(self, context: JobContext):
self._context = context

def ingest_payload(
self,
payload: List[dict],
destination_table: Optional[str],
target: Optional[str] = None,
collection_id: Optional[str] = None,
) -> None:
"""
See parent class doc for details
"""

_log.info(
f"Ingesting payloads to table: {destination_table} in Greenplum database; "
f"collection_id: {collection_id}"
)

with self._context.connections.open_connection(
"GREENPLUM"
).connect() as connection:
cursor = connection.cursor()
query, parameters = self._populate_query_parameters_tuple(
destination_table, cursor, payload
)

try:
cursor.execute(query, parameters)
_log.debug("Payload was ingested.")
except Exception as e:
errors.log_and_rethrow(
errors.find_whom_to_blame_from_exception(e),
_log,
"Failed to send payload",
"Unknown error. Error message was : " + str(e),
"Will not be able to send the payload for ingestion",
"See error message for help ",
e,
wrap_in_vdk_error=True,
)

@staticmethod
def _populate_query_parameters_tuple(
destination_table: str, cursor: PEP249Cursor, payload: List[dict]
) -> (str, Tuple[str]):
"""
Returns insert into destination table tuple of query and parameters;
E.g. for a table dest_table with columns val1, val2 and payload size 3, this method will return:
'INSERT INTO dest_table (val1, val2) VALUES (%s, %s), (%s, %s), (%s, %s)', ['val1', 'val2']

:param destination_table: str
the name of the destination table
:param cursor: PEP249Cursor
the database cursor
:param payload: List[dict]
the payloads to be ingested
:return: Tuple[str, Tuple[str]]
tuple containing the query and parameters
"""
cursor.execute(f"SELECT * FROM {destination_table} WHERE false")
columns = [c.name for c in cursor.description]

row_placeholder = f"({', '.join('%s' for column in columns)})"

return (
f"INSERT INTO {destination_table} ({', '.join(columns)}) "
f"VALUES {', '.join([row_placeholder for i in range(len(payload))])}",
tuple(obj[column] for obj in payload for column in columns),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0


def run(job_input):
payload = {"some_data": "some_test_data", "more_data": "more_test_data"}

for i in range(5):
job_input.send_object_for_ingestion(
payload=payload, destination_table="test_table"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os
import pathlib
from unittest import mock
from unittest import TestCase

import pytest
import vdk.internal.core.errors
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.greenplum import greenplum_plugin
from vdk.plugin.greenplum.ingest_to_greenplum import IngestToGreenplum
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
from vdk.plugin.test_utils.util_funcs import get_test_job_path

VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE"
VDK_GREENPLUM_DBNAME = "VDK_GREENPLUM_DBNAME"
VDK_GREENPLUM_USER = "VDK_GREENPLUM_USER"
VDK_GREENPLUM_PASSWORD = "VDK_GREENPLUM_PASSWORD"
VDK_GREENPLUM_HOST = "VDK_GREENPLUM_HOST"
VDK_GREENPLUM_PORT = "VDK_GREENPLUM_PORT"
VDK_INGEST_METHOD_DEFAULT = "VDK_INGEST_METHOD_DEFAULT"


@pytest.mark.usefixtures("greenplum_service")
@mock.patch.dict(
os.environ,
{
VDK_DB_DEFAULT_TYPE: "GREENPLUM",
VDK_GREENPLUM_DBNAME: "postgres",
VDK_GREENPLUM_USER: "gpadmin",
VDK_GREENPLUM_PASSWORD: "pivotal",
VDK_GREENPLUM_HOST: "localhost",
VDK_GREENPLUM_PORT: "5432",
VDK_INGEST_METHOD_DEFAULT: "GREENPLUM",
},
)
class IngestToGreenplumTests(TestCase):
def test_ingest_to_greenplum(self):
runner = CliEntryBasedTestRunner(greenplum_plugin)

runner.invoke(
[
"greenplum-query",
"--query",
"CREATE TABLE test_table (some_data varchar, more_data varchar)",
]
)

ingest_job_result = runner.invoke(
[
"run",
get_test_job_path(
pathlib.Path(os.path.dirname(os.path.abspath(__file__))),
"test_ingest_to_greenplum_job",
),
]
)

cli_assert_equal(0, ingest_job_result)

check_result = runner.invoke(
["greenplum-query", "--query", "SELECT * FROM test_table"]
)

assert check_result.stdout == (
"-------------- --------------\n"
"some_test_data more_test_data\n"
"some_test_data more_test_data\n"
"some_test_data more_test_data\n"
"some_test_data more_test_data\n"
"some_test_data more_test_data\n"
"-------------- --------------\n"
)

def test_ingest_to_greenplum_no_dest_table(self):
runner = CliEntryBasedTestRunner(greenplum_plugin)

runner.invoke(["greenplum-query", "--query", "DROP TABLE IF EXISTS test_table"])

ingest_job_result = runner.invoke(
[
"run",
get_test_job_path(
pathlib.Path(os.path.dirname(os.path.abspath(__file__))),
"test_ingest_to_greenplum_job",
),
]
)

assert "UndefinedTable" in ingest_job_result.output