Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make conform_record_data_types work on nested objects and arrays #887

Merged
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3d15f91
Make conform_record_data_types work on nested objects and arrays
Jack-Burnett Aug 5, 2022
2d8bf6d
Merge branch 'main' into 800-recursive-type-conformance
Jack-Burnett Aug 5, 2022
34f9d78
Undo format changes
Jack-Burnett Aug 5, 2022
7137643
Merge branch '800-recursive-type-conformance' of https://github.com/J…
Jack-Burnett Aug 5, 2022
0b9fa9e
Add support for mixed arrays
Jack-Burnett Aug 5, 2022
19bd167
Fix formatting
Jack-Burnett Aug 5, 2022
8e588bb
Add testing for primitive coversions
Jack-Burnett Aug 5, 2022
97109b8
format
Jack-Burnett Aug 5, 2022
4f274bd
Remove debug line
Jack-Burnett Aug 5, 2022
c48e36d
Merge branch 'main' into 800-recursive-type-conformance
Jack-Burnett Aug 8, 2022
dd54125
Merge branch 'main' into 800-recursive-type-conformance
edgarrmondragon Aug 8, 2022
8801f8a
Merge branch 'main' into 800-recursive-type-conformance
edgarrmondragon Aug 8, 2022
6deb7ec
Merge branch 'main' into 800-recursive-type-conformance
Jack-Burnett Aug 25, 2022
4adb38b
Merge branch 'main' into 800-recursive-type-conformance
Jack-Burnett Sep 11, 2022
6f99bda
Merge branch 'main' into 800-recursive-type-conformance
Jack-Burnett Sep 26, 2022
fa30578
Merge properly
Jack-Burnett Sep 26, 2022
20866a3
fix pointless logging
Jack-Burnett Oct 27, 2022
7bb99fe
Merge branch 'main' into 800-recursive-type-conformance
Jack-Burnett Nov 15, 2022
8e1449e
dedupe
Jack-Burnett Nov 15, 2022
a70d872
lint
Jack-Burnett Nov 15, 2022
9968512
Merge branch 'main' into 800-recursive-type-conformance
cjohnhanson Nov 17, 2022
a9d9246
Merge branch 'main' into 800-recursive-type-conformance
Jack-Burnett Nov 18, 2022
3cc2323
Merge branch 'main' into 800-recursive-type-conformance
edgarrmondragon Dec 14, 2022
c84b7df
Merge branch 'main' into 800-recursive-type-conformance
edgarrmondragon Dec 19, 2022
6aa8bf7
Merge branch 'main' into 800-recursive-type-conformance
edgarrmondragon Dec 22, 2022
c91949a
Add conformance levels
Jack-Burnett Jan 3, 2023
d892e38
flake8
Jack-Burnett Jan 3, 2023
c914ab6
Merge branch 'main' into 800-recursive-type-conformance
Jack-Burnett Jan 3, 2023
7cbf787
Merge branch 'main' into 800-recursive-type-conformance
edgarrmondragon Jan 3, 2023
02291f6
Merge branch 'main' into 800-recursive-type-conformance
edgarrmondragon Jan 3, 2023
31a4d73
Use `caplog` fixture
edgarrmondragon Jan 3, 2023
69f0b18
Rename `ConformanceLevel` to `TypeConformanceLevel`
edgarrmondragon Jan 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 107 additions & 38 deletions singer_sdk/helpers/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ def is_object_type(property_schema: dict) -> Optional[bool]:
return False


def is_uniform_list(property_schema: dict) -> Optional[bool]:
"""Return true if the JSON Schema type is an array with a single schema.

This is as opposed to 'tuples' where different indices have different schemas;
https://json-schema.org/understanding-json-schema/reference/array.html#array
"""
return (
is_array_type(property_schema) is True
and "items" in property_schema
and "prefixItems" not in property_schema
and isinstance(property_schema["items"], dict)
)


def is_datetime_type(type_dict: dict) -> bool:
"""Return True if JSON Schema type definition is a 'date-time' type.

Expand Down Expand Up @@ -214,7 +228,7 @@ def is_string_array_type(type_dict: dict) -> bool:


def is_array_type(type_dict: dict) -> bool:
"""Return True if JSON Schema type definition is a string array."""
"""Return True if JSON Schema type is an array."""
if not type_dict:
raise ValueError(
"Could not detect type from empty type_dict. "
Expand Down Expand Up @@ -264,7 +278,7 @@ def is_string_type(property_schema: dict) -> Optional[bool]:
def _warn_unmapped_properties(
stream_name: str, property_names: Tuple[str], logger: logging.Logger
):
logger.info(
logger.warning(
f"Properties {property_names} were present in the '{stream_name}' stream but "
"not found in catalog schema. Ignoring."
)
Expand All @@ -275,46 +289,101 @@ def conform_record_data_types( # noqa: C901
) -> Dict[str, Any]:
"""Translate values in record dictionary to singer-compatible data types.

Any property names not found in the schema catalog will be removed, and a
warning will be logged exactly once per unmapped property name.
Any property names not found in the schema catalog will be removed, and a single
warning will be logged listing each unmapped property name.
"""
rec: Dict[str, Any] = {}
rec, unmapped_properties = _conform_record_data_types(record, schema, None)

if len(unmapped_properties) > 0:
_warn_unmapped_properties(stream_name, tuple(unmapped_properties), logger)

return rec


def _conform_record_data_types(
input_object: Dict[str, Any], schema: dict, parent: Optional[str]
) -> Tuple[Dict[str, Any], List[str]]: # noqa: C901
"""Translate values in record dictionary to singer-compatible data types.

Any property names not found in the schema catalog will be removed, and a single
warning will be logged listing each unmapped property name.

This is called recursively to process nested objects and arrays.

Args:
input_object: A single record
schema: JSON schema the given input_object is expected to meet
parent: '.' seperated path to this element from the object root (for logging)
"""
output_object: Dict[str, Any] = {}
unmapped_properties: List[str] = []
for property_name, elem in record.items():
for property_name, elem in input_object.items():
property_path = (
property_name if parent is None else parent + "." + property_name
)
if property_name not in schema["properties"]:
unmapped_properties.append(property_name)
unmapped_properties.append(property_path)
continue

property_schema = schema["properties"][property_name]
if isinstance(elem, (datetime.datetime, pendulum.DateTime)):
rec[property_name] = to_json_compatible(elem)
elif isinstance(elem, datetime.date):
rec[property_name] = elem.isoformat() + "T00:00:00+00:00"
elif isinstance(elem, datetime.timedelta):
epoch = datetime.datetime.utcfromtimestamp(0)
timedelta_from_epoch = epoch + elem
rec[property_name] = timedelta_from_epoch.isoformat() + "+00:00"
elif isinstance(elem, datetime.time):
rec[property_name] = str(elem)
elif isinstance(elem, bytes):
# for BIT value, treat 0 as False and anything else as True
bit_representation: bool
if is_boolean_type(property_schema):
bit_representation = elem != b"\x00"
rec[property_name] = bit_representation
else:
rec[property_name] = elem.hex()
elif is_boolean_type(property_schema):
boolean_representation: Optional[bool]
if elem is None:
boolean_representation = None
elif elem == 0:
boolean_representation = False
else:
boolean_representation = True
rec[property_name] = boolean_representation
if isinstance(elem, list) and is_uniform_list(property_schema):
item_schema = property_schema["items"]
output = []
for item in elem:
if is_object_type(item_schema) and isinstance(item, dict):
output_item, sub_unmapped_properties = _conform_record_data_types(
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
item, item_schema, property_path
)
unmapped_properties.extend(sub_unmapped_properties)
output.append(output_item)
else:
output.append(_conform_primitive_property(item, item_schema))
output_object[property_name] = output
elif (
isinstance(elem, dict)
and is_object_type(property_schema)
and "properties" in property_schema
):
(
output_object[property_name],
sub_unmapped_properties,
) = _conform_record_data_types(elem, property_schema, property_path)
unmapped_properties.extend(sub_unmapped_properties)
else:
rec[property_name] = elem
if unmapped_properties:
_warn_unmapped_properties(stream_name, tuple(unmapped_properties), logger)
return rec
output_object[property_name] = _conform_primitive_property(
elem, property_schema
)
return output_object, unmapped_properties


def _conform_primitive_property(elem: Any, property_schema: dict) -> Any:
"""Converts a primitive (i.e. not object or array) to a json compatible type."""
if isinstance(elem, (datetime.datetime, pendulum.DateTime)):
return to_json_compatible(elem)
elif isinstance(elem, datetime.date):
return elem.isoformat() + "T00:00:00+00:00"
elif isinstance(elem, datetime.timedelta):
epoch = datetime.datetime.utcfromtimestamp(0)
timedelta_from_epoch = epoch + elem
return timedelta_from_epoch.isoformat() + "+00:00"
elif isinstance(elem, datetime.time):
return str(elem)
elif isinstance(elem, bytes):
# for BIT value, treat 0 as False and anything else as True
bit_representation: bool
if is_boolean_type(property_schema):
bit_representation = elem != b"\x00"
return bit_representation
else:
return elem.hex()
elif is_boolean_type(property_schema):
boolean_representation: Optional[bool]
if elem is None:
boolean_representation = None
elif elem == 0:
boolean_representation = False
else:
boolean_representation = True
return boolean_representation
else:
return elem
198 changes: 198 additions & 0 deletions tests/core/test_typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
"""Test _typing - specifically conform_record_data_types()."""
import datetime
import logging
import unittest

from singer_sdk.helpers._typing import (
_conform_primitive_property,
conform_record_data_types,
)
from singer_sdk.typing import (
ArrayType,
BooleanType,
PropertiesList,
Property,
StringType,
)

logger = logging.getLogger("log")


def test_simple_schema_conforms_types():
schema = PropertiesList(
Property("true", BooleanType),
Property("false", BooleanType),
).to_dict()

record = {
"true": b"\x01",
"false": b"\x00",
}

expected_output = {
"true": True,
"false": False,
}

actual_output = conform_record_data_types("test_stream", record, schema, logger)
assert actual_output == expected_output


def test_primitive_arrays_are_conformed():
schema = PropertiesList(
Property("list", ArrayType(BooleanType)),
).to_dict()

record = {
"list": [b"\x01", b"\x00"],
}

expected_output = {"list": [True, False]}

actual_output = conform_record_data_types("test_stream", record, schema, logger)
assert actual_output == expected_output


def test_object_arrays_are_conformed():
schema = PropertiesList(
Property("list", ArrayType(PropertiesList(Property("value", BooleanType)))),
).to_dict()

record = {"list": [{"value": b"\x01"}, {"value": b"\x00"}]}

expected_output = {"list": [{"value": True}, {"value": False}]}

actual_output = conform_record_data_types("test_stream", record, schema, logger)
assert actual_output == expected_output


def test_mixed_arrays_are_conformed():
schema = {
"type": "object",
"properties": {
"list": {
"type": ["array", "null"],
"items": {
"type": ["object", "boolean"],
"properties": {"value": {"type": ["boolean", "null"]}},
},
}
},
}

record = {"list": [{"value": b"\x01"}, b"\x00"]}

expected_output = {"list": [{"value": True}, False]}

actual_output = conform_record_data_types("test_stream", record, schema, logger)
assert actual_output == expected_output


def test_nested_objects_are_conformed():
schema = PropertiesList(
Property("object", PropertiesList(Property("value", BooleanType))),
).to_dict()

record = {"object": {"value": b"\x01"}}

expected_output = {"object": {"value": True}}

actual_output = conform_record_data_types("test_stream", record, schema, logger)
assert actual_output == expected_output


class TestSimpleEval(unittest.TestCase):
Jack-Burnett marked this conversation as resolved.
Show resolved Hide resolved
def test_simple_schema_removes_types(self):
schema = PropertiesList(
Property("keep", StringType),
).to_dict()

record = {"keep": "hello", "remove": "goodbye"}

expected_output = {"keep": "hello"}

with self.assertLogs("log", level="WARN") as logs:
actual_output = conform_record_data_types(
"test_stream", record, schema, logger
)
assert actual_output == expected_output
self.assertEqual(
logs.output,
[
"WARNING:log:Properties ('remove',) were present in the 'test_stream' stream but not found in catalog "
"schema. Ignoring."
],
)

def test_nested_objects_remove_types(self):
schema = PropertiesList(
Property("object", PropertiesList(Property("keep", StringType))),
).to_dict()

record = {"object": {"keep": "hello", "remove": "goodbye"}}

expected_output = {"object": {"keep": "hello"}}

with self.assertLogs("log", level="WARN") as logs:
actual_output = conform_record_data_types(
"test_stream", record, schema, logger
)
assert actual_output == expected_output
self.assertEqual(
logs.output,
[
"WARNING:log:Properties ('object.remove',) were present in the 'test_stream' stream but not found in "
"catalog schema. Ignoring."
],
)

def test_object_arrays_remove_types(self):
schema = PropertiesList(
Property("list", ArrayType(PropertiesList(Property("keep", StringType)))),
).to_dict()

record = {"list": [{"keep": "hello", "remove": "goodbye"}]}

expected_output = {"list": [{"keep": "hello"}]}

with self.assertLogs("log", level="WARN") as logs:
actual_output = conform_record_data_types(
"test_stream", record, schema, logger
)
assert actual_output == expected_output
self.assertEqual(
logs.output,
[
"WARNING:log:Properties ('list.remove',) were present in the 'test_stream' stream but not found in "
"catalog schema. Ignoring."
],
)


def test_conform_primitives():
assert (
_conform_primitive_property(datetime.datetime(2020, 5, 17), {"type": "string"})
== "2020-05-17T00:00:00+00:00"
)
assert (
_conform_primitive_property(datetime.date(2020, 5, 17), {"type": "string"})
== "2020-05-17T00:00:00+00:00"
)
assert (
_conform_primitive_property(datetime.timedelta(365), {"type": "string"})
== "1971-01-01T00:00:00+00:00"
)
assert (
_conform_primitive_property(datetime.time(12, 0, 0), {"type": "string"})
== "12:00:00"
)

assert _conform_primitive_property(b"\x00", {"type": "string"}) == "00"
assert _conform_primitive_property(b"\xBC", {"type": "string"}) == "bc"

assert _conform_primitive_property(b"\x00", {"type": "boolean"}) == False
assert _conform_primitive_property(b"\xBC", {"type": "boolean"}) == True

assert _conform_primitive_property(None, {"type": "boolean"}) is None
assert _conform_primitive_property(0, {"type": "boolean"}) == False
assert _conform_primitive_property(1, {"type": "boolean"}) == True