Skip to content

Commit

Permalink
fix(mappers): Schema passthrough for whitelisted fields (#1219)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Nov 29, 2022
1 parent 417e75a commit d23ebbd
Show file tree
Hide file tree
Showing 16 changed files with 191 additions and 89 deletions.
33 changes: 32 additions & 1 deletion docs/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,43 @@ Supported platform markers are `windows`, `darwin`, and `linux`.
### Snapshot Testing

We use [pytest-snapshot](https://pypi.org/project/pytest-snapshot/) for snapshot testing.
To update snapshots, run:

#### Adding a new snapshot

To add a new snapshot, use the `snapshot` fixture and mark the test with the
`@pytest.mark.snapshot` decorator. The fixture will create a new snapshot file
if one does not already exist. If a snapshot file already exists, the fixture
will compare the snapshot to the actual value and fail the test if they do not
match.

The `tests/snapshots` directory is where snapshot files should be stored and
it's available as the `snapshot_dir` fixture.

```python
@pytest.mark.snapshot
def test_snapshot(snapshot, snapshot_dir):
# Configure the snapshot directory
snapshot.snapshot_dir = snapshot_dir.joinpath("test_snapshot_subdir")

snapshot_name = "test_snapshot"
expected_content = "Hello, World!"
snapshot.assert_match(expected_content, snapshot_name)
```

#### Generating or updating snapshots

To update or generate snapshots, run the nox `update_snapshots` session

```bash
nox -rs update_snapshots
```

or use the `--snapshot-update` flag

```bash
poetry run pytest --snapshot-update -m 'snapshot'
```

This will run all tests with the `snapshot` marker and update any snapshots that have changed.
Commit the updated snapshots to your branch if they are expected to change.

Expand Down
9 changes: 5 additions & 4 deletions singer_sdk/helpers/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ def append_type(type_dict: dict, new_type: str) -> dict:
return result

elif "type" in result:
if isinstance(result["type"], list) and new_type not in result["type"]:
result["type"].append(new_type)
elif new_type != result["type"]:
result["type"] = [result["type"], new_type]
type_array = (
result["type"] if isinstance(result["type"], list) else [result["type"]]
)
if new_type not in type_array:
result["type"] = [*type_array, new_type]
return result

raise ValueError(
Expand Down
7 changes: 5 additions & 2 deletions singer_sdk/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,11 @@ def _init_functions_and_schema(
transformed_schema["properties"].pop(prop_key, None)
elif isinstance(prop_def, str):
default_type: JSONTypeHelper = StringType() # Fallback to string
existing_schema: dict = transformed_schema["properties"].get(
prop_key, {}
existing_schema: dict = (
# Use transformed schema if available
transformed_schema["properties"].get(prop_key, {})
# ...or original schema for passthrough
or self.raw_schema["properties"].get(prop_def, {})
)
if existing_schema:
# Set default type if property exists already in JSON Schema
Expand Down
147 changes: 65 additions & 82 deletions tests/core/test_mapper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""Test map transformer."""

import copy
import io
import json
import logging
from typing import Dict, List, Optional, Set, cast
from contextlib import redirect_stdout
from pathlib import Path
from typing import Dict, List, Optional

import pytest
from freezegun import freeze_time
from pytest_snapshot.plugin import Snapshot

from singer_sdk._singerlib import Catalog
from singer_sdk.exceptions import MapExpressionError
Expand Down Expand Up @@ -444,31 +449,30 @@ def clear_schema_cache() -> None:
get_selected_schema.cache_clear()


@freeze_time("2022-01-01T00:00:00Z")
@pytest.mark.snapshot
@pytest.mark.parametrize(
"stream_alias,stream_maps,flatten,flatten_max_depth,output_fields,key_properties",
"stream_maps,flatten,flatten_max_depth,snapshot_name",
[
(
"mystream",
pytest.param(
{},
False,
0,
{"email", "count", "user"},
[],
"no_map.jsonl",
id="no_map",
),
(
"mystream",
pytest.param(
{
"mystream": {
"email_hash": "md5(email)",
}
},
False,
0,
{"email", "count", "user", "email_hash"},
[],
"keep_all_fields.jsonl",
id="keep_all_fields",
),
(
"mystream",
pytest.param(
{
"mystream": {
"email_hash": "md5(email)",
Expand All @@ -478,11 +482,10 @@ def clear_schema_cache() -> None:
},
False,
0,
{"fixed_count", "email_hash"},
[],
"only_mapped_fields.jsonl",
id="only_mapped_fields",
),
(
"mystream",
pytest.param(
{
"mystream": {
"email_hash": "md5(email)",
Expand All @@ -492,51 +495,45 @@ def clear_schema_cache() -> None:
},
False,
0,
{"email_hash"},
["email_hash"],
"changed_key_properties.jsonl",
id="changed_key_properties",
),
(
"sourced_stream_1",
pytest.param(
{"mystream": None, "sourced_stream_1": {"__source__": "mystream"}},
False,
0,
{"email", "count", "user"},
[],
"sourced_stream_1.jsonl",
id="sourced_stream_1",
),
(
"sourced_stream_2",
pytest.param(
{"sourced_stream_2": {"__source__": "mystream"}, "__else__": None},
False,
0,
{"email", "count", "user"},
[],
"sourced_stream_2.jsonl",
id="sourced_stream_2",
),
(
"aliased_stream",
pytest.param(
{"mystream": {"__alias__": "aliased_stream"}},
False,
0,
{"email", "count", "user"},
[],
"aliased_stream.jsonl",
id="aliased_stream",
),
(
"mystream",
pytest.param(
{},
True,
1,
{"email", "count", "user__id", "user__sub"},
[],
"flatten_depth_1.jsonl",
id="flatten_depth_1",
),
(
"mystream",
pytest.param(
{},
True,
10,
{"email", "count", "user__id", "user__sub__num"},
[],
"flatten_all.jsonl",
id="flatten_all",
),
(
"mystream",
pytest.param(
{
"mystream": {
"email_hash": "md5(email)",
Expand All @@ -545,69 +542,55 @@ def clear_schema_cache() -> None:
},
True,
10,
{"email", "count", "email_hash", "user__id", "user__sub__num"},
["email_hash"],
"map_and_flatten.jsonl",
id="map_and_flatten",
),
(
"mystream",
pytest.param(
{
"mystream": {
"email": None,
}
},
False,
0,
{"count", "user"},
[],
"drop_property.jsonl",
id="drop_property",
),
pytest.param(
{
"mystream": {
"count": "count",
"__else__": None,
}
},
False,
0,
"non_pk_passthrough.jsonl",
id="non_pk_passthrough",
),
],
ids=[
"no_map",
"keep_all_fields",
"only_mapped_fields",
"changed_key_properties",
"sourced_stream_1",
"sourced_stream_2",
"aliased_stream",
"flatten_depth_1",
"flatten_all",
"map_and_flatten",
"drop_property",
],
)
def test_mapped_stream(
snapshot: Snapshot,
snapshot_dir: Path,
clear_schema_cache: None,
stream_alias: str,
stream_maps: dict,
flatten: bool,
flatten_max_depth: Optional[int],
output_fields: Set[str],
key_properties: List[str],
snapshot_name: str,
):
snapshot.snapshot_dir = snapshot_dir.joinpath("mapped_stream")

tap = MappedTap(
config={
"stream_maps": stream_maps,
"flattening_enabled": flatten,
"flattening_max_depth": flatten_max_depth,
}
)
stream = tap.streams["mystream"]

schema_messages = list(stream._generate_schema_messages())
assert len(schema_messages) == 1, "Incorrect number of schema messages generated."
schema_message = schema_messages[0]
assert schema_message.stream == stream_alias
assert schema_message.key_properties == key_properties
assert schema_message.schema["properties"].keys() == output_fields

for raw_record in stream.get_records(None):
record_message = next(stream._generate_record_messages(cast(dict, raw_record)))
transformed_record = record_message.record

assert record_message.stream == stream_alias
assert output_fields == set(transformed_record.keys())

for output_field in output_fields:
assert transformed_record[
output_field
], f"Value for field '{output_field}' should be nonempty."
buf = io.StringIO()
with redirect_stdout(buf):
tap.sync_all()

buf.seek(0)
snapshot.assert_match(buf.read(), snapshot_name)
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/aliased_stream.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "aliased_stream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 19, "user": {"id": 3, "sub": {"num": 3}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/changed_key_properties.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"email_hash": {"type": ["string", "null"]}}}, "key_properties": ["email_hash"]}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "426b189df1e2f359efe6ee90f2d2030f"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/drop_property.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "mystream", "record": {"count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 19, "user": {"id": 3, "sub": {"num": 3}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/flatten_all.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub__num": 1}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub__num": 2}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user__id": 3, "user__sub__num": 3}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/flatten_depth_1.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub": "{\"num\": 1}"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub": "{\"num\": 2}"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user__id": 3, "user__sub": "{\"num\": 3}"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/keep_all_fields.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}, "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}, "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user": {"id": 3, "sub": {"num": 3}}, "email_hash": "426b189df1e2f359efe6ee90f2d2030f"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/map_and_flatten.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": ["email_hash"]}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub__num": 1, "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub__num": 2, "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user__id": 3, "user__sub__num": 3, "email_hash": "426b189df1e2f359efe6ee90f2d2030f"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/no_map.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user": {"id": 3, "sub": {"num": 3}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/non_pk_passthrough.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"count": {"type": ["integer", "null"]}}}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"count": 21}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "mystream", "record": {"count": 13}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 19}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
7 changes: 7 additions & 0 deletions tests/snapshots/mapped_stream/only_mapped_fields.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"email_hash": {"type": ["string", "null"]}, "fixed_count": {"type": ["integer", "null"]}}}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "c160f8cc69a4f0bf2b0362752353d060", "fixed_count": 20}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "4b9bb80620f03eb3719e0a061c14283d", "fixed_count": 12}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "426b189df1e2f359efe6ee90f2d2030f", "fixed_count": 18}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
Loading

0 comments on commit d23ebbd

Please sign in to comment.