Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support conforming singer property names to target identifier constraints in SQL sinks #1039

Merged
merged 38 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c0917b0
start on schema and table creation on
Oct 4, 2022
1af0a57
linting
Oct 4, 2022
5e41500
add default schema name
Oct 4, 2022
05ea897
add schema to table metadata
Oct 4, 2022
7bb0e70
Merge branch 'main' into kgpayne/issue1027
Oct 4, 2022
6281e7d
Merge branch 'main' into kgpayne/issue1027
Oct 4, 2022
19b3ccb
Merge branch 'main' into kgpayne/issue1027
Oct 4, 2022
f22fe86
started on conforming columns in SQL streams
Oct 5, 2022
6d7e156
Merge branch 'main' into kgpayne/issue1027
Oct 5, 2022
059301e
Merge branch 'main' into kgpayne/issue1027
edgarrmondragon Oct 5, 2022
e68c045
Add missing import for `singer_sdk.helpers._catalog`
edgarrmondragon Oct 5, 2022
614c4de
Update singer_sdk/sinks/sql.py
Oct 7, 2022
e91ccfd
Update singer_sdk/sinks/sql.py
Oct 7, 2022
68b44a6
Update singer_sdk/sinks/sql.py
Oct 7, 2022
37ec7a6
Update singer_sdk/sinks/sql.py
Oct 7, 2022
0347950
Merge branch 'main' into kgpayne/issue1027
Oct 11, 2022
4db0745
Merge branch 'kgpayne/issue1027' into kgpayne/issue1021
Oct 11, 2022
c7abd72
undo connection module
Oct 11, 2022
c59bd5e
fix copy-paste formatting
Oct 11, 2022
7fd3bb1
fix test
Oct 11, 2022
615e5a6
more connector changes
Oct 11, 2022
4171a95
fix docstring
Oct 11, 2022
5bf574a
Merge branch 'main' into kgpayne/issue1027
Oct 11, 2022
b60ddca
add schema creation test
Oct 12, 2022
1e28606
Merge branch 'kgpayne/issue1027' of github.com:meltano/sdk into kgpay…
Oct 12, 2022
b49ee49
Merge branch 'main' into kgpayne/issue1027
Oct 12, 2022
f244c29
Merge branch 'kgpayne/issue1027' into kgpayne/issue1021
Oct 12, 2022
6cfdfb9
minor fix
Oct 12, 2022
7b2437a
add duplicate conformed key check
Oct 12, 2022
04dd540
Update samples/sample_tap_hostile/hostile_streams.py
Oct 12, 2022
10565a7
add raise on conformed name clash, and leading number conformer
Oct 12, 2022
e65cace
mypy and linting
Oct 12, 2022
d1a12dc
pr suggestion
Oct 12, 2022
3e92f07
Merge branch 'main' into kgpayne/issue1027
Oct 13, 2022
03538d4
Merge branch 'kgpayne/issue1027' into kgpayne/issue1021
Oct 13, 2022
0d7c431
more linting and mypy
Oct 13, 2022
3f9aa9e
Merge branch 'main' into kgpayne/issue1021
Oct 20, 2022
fd1a7a6
fix merge duplicates
Oct 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions samples/sample_tap_hostile/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""A sample tap for testing SQL target property name transformations."""

from .hostile_tap import SampleTapHostile
42 changes: 42 additions & 0 deletions samples/sample_tap_hostile/hostile_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import annotations

import random
import string
from typing import Iterable

from singer_sdk import typing as th
from singer_sdk.streams import Stream


class HostilePropertyNamesStream(Stream):
"""
A stream with property names that are not compatible as unescaped identifiers
in common DBMS systems.
"""

name = "hostile_property_names_stream"
schema = th.PropertiesList(
th.Property("name with spaces", th.StringType),
th.Property("NameIsCamelCase", th.StringType),
th.Property("name-with-dashes", th.StringType),
th.Property("Name-with-Dashes-and-Mixed-cases", th.StringType),
th.Property("5name_starts_with_number", th.StringType),
kgpayne marked this conversation as resolved.
Show resolved Hide resolved
th.Property("name_with_emoji_😈", th.StringType),
).to_dict()

@staticmethod
def get_random_lowercase_string():
return "".join(random.choice(string.ascii_lowercase) for _ in range(10))

def get_records(self, context: dict | None) -> Iterable[dict | tuple[dict, dict]]:
return (
{
"name with spaces": self.get_random_lowercase_string(),
"NameIsCamelCase": self.get_random_lowercase_string(),
"name-with-dashes": self.get_random_lowercase_string(),
"Name-with-Dashes-and-Mixed-cases": self.get_random_lowercase_string(),
"5name_starts_with_number": self.get_random_lowercase_string(),
"name_with_emoji_😈": self.get_random_lowercase_string(),
}
for _ in range(10)
)
24 changes: 24 additions & 0 deletions samples/sample_tap_hostile/hostile_tap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""A sample tap for testing SQL target property name transformations."""

from typing import List

from samples.sample_tap_hostile.hostile_streams import HostilePropertyNamesStream
from singer_sdk import Stream, Tap
from singer_sdk.typing import PropertiesList


class SampleTapHostile(Tap):
"""Sample tap for for testing SQL target property name transformations."""

name: str = "sample-tap-hostile"
config_jsonschema = PropertiesList().to_dict()

def discover_streams(self) -> List[Stream]:
"""Return a list of discovered streams."""
return [
HostilePropertyNamesStream(tap=self),
]


if __name__ == "__main__":
SampleTapHostile.cli()
9 changes: 2 additions & 7 deletions singer_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@
from singer_sdk.mapper_base import InlineMapper
from singer_sdk.plugin_base import PluginBase
from singer_sdk.sinks import BatchSink, RecordSink, Sink, SQLSink
from singer_sdk.streams import (
GraphQLStream,
RESTStream,
SQLConnector,
SQLStream,
Stream,
)
from singer_sdk.sql import SQLConnector
from singer_sdk.streams import GraphQLStream, RESTStream, SQLStream, Stream
from singer_sdk.tap_base import SQLTap, Tap
from singer_sdk.target_base import SQLTarget, Target

Expand Down
20 changes: 20 additions & 0 deletions singer_sdk/helpers/_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""General helper functions, helper classes, and decorators."""

import json
import re
from pathlib import Path, PurePath
from typing import Any, Dict, Union, cast

Expand All @@ -25,3 +26,22 @@ def read_json_file(path: Union[PurePath, str]) -> Dict[str, Any]:
def utc_now() -> pendulum.DateTime:
"""Return current time in UTC."""
return pendulum.now(tz="UTC")


def snakecase(string):
"""Convert string into snake case.

Args:
string: String to convert.

Returns:
string: Snake cased string.
"""
string = re.sub(r"[\-\.\s]", "_", str(string))
string = (
string[0].lower()
+ re.sub(r"[A-Z]", lambda matched: f"_{matched.group(0).lower()}", string[1:])
if string
else string
)
return re.sub(r"_{2,}", "_", string).rstrip("_")
19 changes: 18 additions & 1 deletion singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
self.latest_state: dict | None = None
self._draining_state: dict | None = None
self.drained_state: dict | None = None
self.key_properties = key_properties or []
self._key_properties = key_properties or []

# Tally counters
self._total_records_written: int = 0
Expand Down Expand Up @@ -202,6 +202,15 @@ def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum:
"""
return DatetimeErrorTreatmentEnum.ERROR

@property
def key_properties(self) -> list[str]:
"""Return key properties.

Returns:
A list of stream key properties.
"""
return self._key_properties

# Record processing

def _add_sdc_metadata_to_record(
Expand Down Expand Up @@ -423,6 +432,14 @@ def activate_version(self, new_version: int) -> None:
"Ignoring."
)

def setup(self) -> None:
"""Perform any setup actions at the beginning of a Stream.

Setup is executed once per Sink instance, after instantiation. If a Schema
change is detected, a new Sink is instantiated and this method is called again.
"""
pass

def clean_up(self) -> None:
"""Perform any clean up actions required at end of a stream.

Expand Down
Loading