Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update branch #3

Merged
merged 9 commits into from
Mar 8, 2023
2 changes: 1 addition & 1 deletion kedro-airflow/kedro_airflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
""" Kedro plugin for running a project with Airflow """
"""Kedro plugin for running a project with Airflow."""

__version__ = "0.5.1"
3 changes: 1 addition & 2 deletions kedro-airflow/kedro_airflow/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@


@click.group(name="Kedro-Airflow")
def commands():
"""Kedro plugin for running a project with Airflow"""
def commands(): # pylint: disable=missing-function-docstring
pass


Expand Down
1 change: 1 addition & 0 deletions kedro-airflow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ def metadata(cli_runner): # pylint: disable=unused-argument
project_path,
kedro_version,
project_path / "src",
kedro_version,
)
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
## Bug fixes and other changes
* Fixed doc string formatting in `VideoDataSet` causing the documentation builds to fail.


# Release 1.0.0:

First official release of Kedro-Datasets.
Expand Down
69 changes: 69 additions & 0 deletions kedro-datasets/kedro_datasets/pandas/sql_dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""``SQLDataSet`` to load and save data to a SQL backend."""

import copy
import datetime as dt
import re
from pathlib import PurePosixPath
from typing import Any, Dict, NoReturn, Optional
Expand All @@ -22,6 +23,7 @@
"psycopg2": "psycopg2",
"mysqldb": "mysqlclient",
"cx_Oracle": "cx_Oracle",
"mssql": "pyodbc",
}

DRIVER_ERROR_MESSAGE = """
Expand Down Expand Up @@ -321,7 +323,49 @@ class SQLQueryDataSet(AbstractDataSet[None, pd.DataFrame]):
>>> credentials=credentials)
>>>
>>> sql_data = data_set.load()
>>>
Example of usage for mssql:
::


>>> credentials = {"server": "localhost", "port": "1433",
>>> "database": "TestDB", "user": "SA",
>>> "password": "StrongPassword"}
>>> def _make_mssql_connection_str(
>>> server: str, port: str, database: str, user: str, password: str
>>> ) -> str:
>>> import pyodbc # noqa
>>> from sqlalchemy.engine import URL # noqa
>>>
>>> driver = pyodbc.drivers()[-1]
>>> connection_str = (f"DRIVER={driver};SERVER={server},{port};DATABASE={database};"
>>> f"ENCRYPT=yes;UID={user};PWD={password};"
>>> "TrustServerCertificate=yes;")
>>> return URL.create("mssql+pyodbc", query={"odbc_connect": connection_str})
>>> connection_str = _make_mssql_connection_str(**credentials)
>>> data_set = SQLQueryDataSet(credentials={"con": connection_str},
>>> sql="SELECT TOP 5 * FROM TestTable;")
>>> df = data_set.load()

In addition, here is an example of a catalog with dates parsing:
::


>>> mssql_dataset:
>>> type: kedro_datasets.pandas.SQLQueryDataSet
>>> credentials: mssql_credentials
>>> sql: >
>>> SELECT *
>>> FROM DateTable
>>> WHERE date >= ? AND date <= ?
>>> ORDER BY date
>>> load_args:
>>> params:
>>> - ${begin}
>>> - ${end}
>>> index_col: date
>>> parse_dates:
>>> date: "%Y-%m-%d %H:%M:%S.%f0 %z"
"""

# using Any because of Sphinx but it should be
Expand Down Expand Up @@ -413,6 +457,8 @@ def __init__( # pylint: disable=too-many-arguments
self._connection_str = credentials["con"]
self._execution_options = execution_options or {}
self.create_connection(self._connection_str)
if "mssql" in self._connection_str:
self.adapt_mssql_date_params()

@classmethod
def create_connection(cls, connection_str: str) -> None:
Expand Down Expand Up @@ -456,3 +502,26 @@ def _load(self) -> pd.DataFrame:

def _save(self, data: None) -> NoReturn:
raise DataSetError("'save' is not supported on SQLQueryDataSet")

# For mssql only
def adapt_mssql_date_params(self) -> None:
"""We need to change the format of datetime parameters.
MSSQL expects datetime in the exact format %y-%m-%dT%H:%M:%S.
Here, we also accept plain dates.
`pyodbc` does not accept named parameters, they must be provided as a list."""
params = self._load_args.get("params", [])
if not isinstance(params, list):
raise DataSetError(
"Unrecognized `params` format. It can be only a `list`, "
f"got {type(params)!r}"
)
new_load_args = []
for value in params:
try:
as_date = dt.date.fromisoformat(value)
new_val = dt.datetime.combine(as_date, dt.time.min)
new_load_args.append(new_val.strftime("%Y-%m-%dT%H:%M:%S"))
except (TypeError, ValueError):
new_load_args.append(value)
if new_load_args:
self._load_args["params"] = new_load_args
8 changes: 8 additions & 0 deletions kedro-datasets/kedro_datasets/polars/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""``AbstractDataSet`` implementations that produce pandas DataFrames."""

__all__ = ["CSVDataSet"]

from contextlib import suppress

with suppress(ImportError):
from .csv_dataset import CSVDataSet
191 changes: 191 additions & 0 deletions kedro-datasets/kedro_datasets/polars/csv_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""``CSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem (e.g.: local, S3, GCS). It uses polars to handle the CSV file.
"""
import logging
from copy import deepcopy
from io import BytesIO
from pathlib import PurePosixPath
from typing import Any, Dict

import fsspec
import polars as pl
from kedro.io.core import (
PROTOCOL_DELIMITER,
AbstractVersionedDataSet,
DataSetError,
Version,
get_filepath_str,
get_protocol_and_path,
)

logger = logging.getLogger(__name__)


class CSVDataSet(AbstractVersionedDataSet[pl.DataFrame, pl.DataFrame]):
"""``CSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem (e.g.: local, S3, GCS). It uses polars to handle the CSV file.

Example adding a catalog entry with
`YAML API
<https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:

.. code-block:: yaml

>>> cars:
>>> type: polars.CSVDataSet
>>> filepath: data/01_raw/company/cars.csv
>>> load_args:
>>> sep: ","
>>> parse_dates: False
>>> save_args:
>>> has_header: False
null_value: "somenullstring"
>>>
>>> motorbikes:
>>> type: polars.CSVDataSet
>>> filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
>>> credentials: dev_s3

Example using Python API:
::

>>> from kedro_datasets.polars import CSVDataSet
>>> import polars as pl
>>>
>>> data = pl.DataFrame({'col1': [1, 2], 'col2': [4, 5],
>>> 'col3': [5, 6]})
>>>
>>> data_set = CSVDataSet(filepath="test.csv")
>>> data_set.save(data)
>>> reloaded = data_set.load()
>>> assert data.frame_equal(reloaded)

"""

DEFAULT_LOAD_ARGS = {"rechunk": True} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]

# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``CSVDataSet`` pointing to a concrete CSV file
on a specific filesystem.

Args:
filepath: Filepath in POSIX format to a CSV file prefixed with a protocol
`s3://`.
If prefix is not provided, `file` protocol (local filesystem)
will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
load_args: Polars options for loading CSV files.
Here you can find all available arguments:
https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.read_csv.html#polars.read_csv
All defaults are preserved, but we explicity use `rechunk=True` for `seaborn`
compability.
save_args: Polars options for saving CSV files.
Here you can find all available arguments:
https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_csv.html
All defaults are preserved.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}

protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)

self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)

super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

if "storage_options" in self._save_args or "storage_options" in self._load_args:
logger.warning(
"Dropping 'storage_options' for %s, "
"please specify them under 'fs_args' or 'credentials'.",
self._filepath,
)
self._save_args.pop("storage_options", None)
self._load_args.pop("storage_options", None)

def _describe(self) -> Dict[str, Any]:
return {
"filepath": self._filepath,
"protocol": self._protocol,
"load_args": self._load_args,
"save_args": self._save_args,
"version": self._version,
}

def _load(self) -> pl.DataFrame:
load_path = str(self._get_load_path())
if self._protocol == "file":
# file:// protocol seems to misbehave on Windows
# (<urlopen error file not on local host>),
# so we don't join that back to the filepath;
# storage_options also don't work with local paths
return pl.read_csv(load_path, **self._load_args)

load_path = f"{self._protocol}{PROTOCOL_DELIMITER}{load_path}"
return pl.read_csv(
load_path, storage_options=self._storage_options, **self._load_args
)

def _save(self, data: pl.DataFrame) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)

buf = BytesIO()
data.write_csv(file=buf, **self._save_args)

with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(buf.getvalue())

self._invalidate_cache()

def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False

return self._fs.exists(load_path)

def _release(self) -> None:
super()._release()
self._invalidate_cache()

def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)
29 changes: 25 additions & 4 deletions kedro-datasets/kedro_datasets/spark/spark_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
``pyspark``
"""
import json
import logging
import os
from copy import deepcopy
from fnmatch import fnmatch
from functools import partial
Expand All @@ -23,6 +25,8 @@
from pyspark.sql.utils import AnalysisException
from s3fs import S3FileSystem

logger = logging.getLogger(__name__)


def _parse_glob_pattern(pattern: str) -> str:
special = ("*", "?", "[")
Expand Down Expand Up @@ -114,6 +118,20 @@ def _dbfs_exists(pattern: str, dbutils: Any) -> bool:
return False


def _deployed_on_databricks() -> bool:
"""Check if running on Databricks."""
return "DATABRICKS_RUNTIME_VERSION" in os.environ


def _path_has_dbfs_prefix(path: str) -> bool:
"""Check if a file path has a valid dbfs prefix.

Args:
path: File path to check.
"""
return path.startswith("/dbfs/")


class KedroHdfsInsecureClient(InsecureClient):
"""Subclasses ``hdfs.InsecureClient`` and implements ``hdfs_exists``
and ``hdfs_glob`` methods required by ``SparkDataSet``"""
Expand Down Expand Up @@ -240,9 +258,7 @@ def __init__( # pylint: disable=too-many-arguments

Args:
filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks
and working with data written to mount path points,
specify ``filepath``s for (versioned) ``SparkDataSet``s
starting with ``/dbfs/mnt``.
specify ``filepath``s starting with ``/dbfs/``.
file_format: File format used during load and save
operations. These are formats supported by the running
SparkContext include parquet, csv, delta. For a list of supported
Expand Down Expand Up @@ -304,7 +320,12 @@ def __init__( # pylint: disable=too-many-arguments

else:
path = PurePosixPath(filepath)

if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath):
logger.warning(
"Using SparkDataSet on Databricks without the `/dbfs/` prefix in the "
"filepath is a known source of error. You must add this prefix to %s",
filepath,
)
if filepath.startswith("/dbfs"):
dbutils = _get_dbutils(self._get_spark())
if dbutils:
Expand Down
Loading