Skip to content

Commit

Permalink
vdk-snowflake: Refactor plugin code
Browse files Browse the repository at this point in the history
After receiving feedback on the code structure of the
vdk-snowflake plugin, a refactoring was necessary to
optimize it.

This change optimizes the configuration of the plugin
and changes the result message printed after executing
a query to use tabular format, instead of json string.

Testing Done: Added unit test and tested locally by
running a data job and queries

Signed-off-by: Andon Andonov <[email protected]>
  • Loading branch information
doks5 committed Sep 13, 2021
1 parent d1a37fb commit 73daa41
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.internal.core.config import Configuration
from vdk.internal.core.config import ConfigurationBuilder

SNOWFLAKE_ACCOUNT = "SNOWFLAKE_ACCOUNT"
SNOWFLAKE_USER = "SNOWFLAKE_USER"
SNOWFLAKE_PASSWORD = "SNOWFLAKE_PASSWORD"
SNOWFLAKE_WAREHOUSE = "SNOWFLAKE_WAREHOUSE"
SNOWFLAKE_DATABASE = "SNOWFLAKE_DATABASE"
SNOWFLAKE_SCHEMA = "SNOWFLAKE_SCHEMA"


class SnowflakeConfiguration:
def __init__(self, config: Configuration) -> None:
self.__config = config

def get_snowflake_account(self):
return self.__config.get_required_value(SNOWFLAKE_ACCOUNT)

def get_snowflake_user(self):
return self.__config.get_required_value(SNOWFLAKE_USER)

def get_snowflake_password(self):
return self.__config.get_required_value(SNOWFLAKE_PASSWORD)

def get_snowflake_warehouse(self):
return self.__config.get_value(SNOWFLAKE_WAREHOUSE)

def get_snowflake_database(self):
return self.__config.get_value(SNOWFLAKE_DATABASE)

def get_snowflake_schema(self):
return self.__config.get_value(SNOWFLAKE_SCHEMA)


def add_definitions(config_builder: ConfigurationBuilder):
config_builder.add(
key=SNOWFLAKE_ACCOUNT,
default_value=None,
description="The Snowflake account identifier as described in https://docs.snowflake.com/en/user-guide/admin-account-identifier.html It is required to connect to a Snowflake instance.",
)
config_builder.add(key=SNOWFLAKE_USER, default_value=None, description="User name")
config_builder.add(
key=SNOWFLAKE_PASSWORD, default_value=None, description="User password"
)
config_builder.add(
key=SNOWFLAKE_WAREHOUSE,
default_value=None,
description="The warehouse to be used.",
)
config_builder.add(
key=SNOWFLAKE_DATABASE,
default_value=None,
description="The snowflake database to be used.",
)
config_builder.add(
key=SNOWFLAKE_SCHEMA, default_value=None, description="The database schema"
)
Original file line number Diff line number Diff line change
@@ -1,98 +1,64 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Callable

import click
import pluggy
from snowflake.connector.errors import ProgrammingError
from vdk.api.plugin.hook_markers import hookimpl
from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Connection
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.builtin_plugins.run.step import Step
from vdk.internal.core.config import ConfigurationBuilder
from vdk.internal.core.context import CoreContext
from vdk.internal.core.errors import UserCodeError
from vdk.internal.core.statestore import ImmutableStoreKey
from vdk.internal.core.config import Configuration
from vdk.internal.snowflake_connection import SnowflakeConnection
from vdk.internal.snowflake_configuration import add_definitions
from vdk.internal.snowflake_configuration import SnowflakeConfiguration

"""
VDK-Snowflake Plugin
"""
log = logging.getLogger(__name__)


@hookimpl(tryfirst=True)
def vdk_configure(config_builder: ConfigurationBuilder) -> None:
def _new_snowflake_connection(configuration: Configuration) -> PEP249Connection:
"""
Here we define what configuration settings are needed for snowflake with reasonable defaults
Create new Snowflake connection instance.
"""
config_builder.add(
key="SNOWFLAKE_ACCOUNT",
default_value="localhost",
description="The Snowflake account identifier as described in https://docs.snowflake.com/en/user-guide/admin-account-identifier.html It is required to connect to a Snowflake instance.",
)
config_builder.add(
key="SNOWFLAKE_SCHEMA", default_value=None, description="The database schema"
)
config_builder.add(
key="SNOWFLAKE_WAREHOUSE",
default_value=None,
description="The warehouse to be used.",
config = SnowflakeConfiguration(configuration)
return SnowflakeConnection(
account=config.get_snowflake_account(),
user=config.get_snowflake_user(),
password=config.get_snowflake_password(),
warehouse=config.get_snowflake_warehouse(),
database=config.get_snowflake_database(),
schema=config.get_snowflake_schema(),
)
config_builder.add(
key="SNOWFLAKE_DATABASE",
default_value=None,
description="The snowflake database to be used.",
)
config_builder.add(
key="SNOWFLAKE_USER", default_value="unknown", description="User name"
)
config_builder.add(
key="SNOWFLAKE_PASSWORD", default_value=None, description="User password"
)


SnowflakeConnectionFunc = Callable[[], PEP249Connection]
CONNECTION_FUNC_KEY = ImmutableStoreKey[SnowflakeConnectionFunc](
"snowflake-connection-method"
)


@hookimpl
def vdk_initialize(context: CoreContext) -> None:
configuration = context.configuration

def new_connection() -> PEP249Connection:
connection = SnowflakeConnection(
account=configuration.get_required_value("SNOWFLAKE_ACCOUNT"),
schema=configuration.get_value("SNOWFLAKE_SCHEMA"),
warehouse=configuration.get_value("SNOWFLAKE_WAREHOUSE"),
database=configuration.get_value("SNOWFLAKE_DATABASE"),
user=configuration.get_required_value("SNOWFLAKE_USER"),
password=configuration.get_required_value("SNOWFLAKE_PASSWORD"),
)
return connection

context.state.set(CONNECTION_FUNC_KEY, new_connection)
@hookimpl(tryfirst=True)
def vdk_configure(config_builder: ConfigurationBuilder) -> None:
"""
Here we define what configuration settings are needed for snowflake with reasonable defaults
"""
add_definitions(config_builder)


@hookimpl
def initialize_job(context: JobContext) -> None:
context.connections.add_open_connection_factory_method(
"SNOWFLAKE", context.core_context.state.get(CONNECTION_FUNC_KEY)
"SNOWFLAKE",
lambda: _new_snowflake_connection(context.core_context.configuration),
)


@click.command(name="snowflake-query", help="executes SQL query against Snowflake")
@click.option("-q", "--query", type=click.STRING, required=True)
@click.pass_context
def snowflake_query(ctx: click.Context, query):
with ctx.obj.state.get(CONNECTION_FUNC_KEY)() as conn:
res = conn.execute_query(query)
import json
from tabulate import tabulate

conn = _new_snowflake_connection(ctx.obj.configuration)

click.echo(json.dumps(res, indent=2))
click.echo(tabulate(conn.execute_query(query)))
conn.close()


@hookimpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pytest
import snowflake
from vdk.internal.core import errors
from vdk.internal.snowflake_connection import SnowflakeConnection


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os
from unittest import mock

import pytest
from click.testing import Result
from vdk.internal import snowflake_plugin
from vdk.internal.snowflake_connection import SnowflakeConnection
from vdk.internal.test_utils.util_funcs import cli_assert_equal
from vdk.internal.test_utils.util_funcs import CliEntryBasedTestRunner


@pytest.fixture
def mocked_connection(monkeypatch):
def mock_execute_query(*args, **kwargs):
return [["Query successfully executed."]]

monkeypatch.delattr(
"vdk.internal.snowflake_connection.SnowflakeConnection._connect"
)
monkeypatch.setattr(SnowflakeConnection, "execute_query", mock_execute_query)


def test_snowflake_plugin(mocked_connection):
"""
Test if the configuration of the Snowflake plugin
and its general setup work as expected.
"""
with mock.patch.dict(
os.environ,
{
"VDK_DB_DEFAULT_TYPE": "SNOWFLAKE",
"VDK_SNOWFLAKE_ACCOUNT": "testaccount",
"VDK_SNOWFLAKE_USER": "testuser",
"VDK_SNOWFLAKE_PASSWORD": "testpassword",
},
):
runner = CliEntryBasedTestRunner(snowflake_plugin)

query_result: Result = runner.invoke(
["snowflake-query", "--query", f"SELECT 1"]
)

cli_assert_equal(0, query_result)
assert "Query successfully executed." in query_result.output

0 comments on commit 73daa41

Please sign in to comment.