Skip to content

Commit

Permalink
feat(targets): Support a x-sql-datatype JSON Schema annotation to l…
Browse files Browse the repository at this point in the history
…et targets customize SQL type handling (#2829)

* feat(targets): Support a `x-sql-datatype` annotation to let targets customize SQL type handling

* docs: Document `x-sql-datatype` usage

* docs: Add example to method docstring

* test: Test the case where no handler for the x-sql-datatype is available
  • Loading branch information
edgarrmondragon authored Jan 15, 2025
1 parent 8638b07 commit b3610ca
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 10 deletions.
53 changes: 53 additions & 0 deletions docs/guides/sql-target.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,56 @@ class MyConnector(SQLConnector):
to_sql.register_format_handler("uri", URI)
return to_sql
```

### Use the `x-sql-datatype` JSON Schema extension

You can register new type handlers for the `x-sql-datatype` extension:

```python
from my_sqlalchemy_dialect import URI


class MyConnector(SQLConnector):
@functools.cached_property
def jsonschema_to_sql(self):
to_sql = JSONSchemaToSQL()
to_sql.register_sql_datatype_handler("smallint", sa.types.SMALLINT)
return to_sql
```

Then you can annotate the tap' catalog to specify the SQL type:

````{tab} meltano.yml
```yaml
# https://docs.meltano.com/concepts/plugins/#schema-extra
plugins:
extractors:
- name: tap-example
schema:
addresses:
number:
x-sql-datatype: smallint
```
````

````{tab} JSON catalog
```json
{
"streams": [
{
"stream": "addresses",
"tap_stream_id": "addresses",
"schema": {
"type": "object",
"properties": {
"number": {
"type": "integer",
"x-sql-datatype": "smallint"
}
}
}
}
]
}
```
````
24 changes: 18 additions & 6 deletions singer_sdk/_singerlib/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
"anyOf",
"patternProperties",
"allOf",
# JSON Schema extensions
"x-sql-datatype",
]


Expand Down Expand Up @@ -84,6 +86,9 @@ class Schema:
contentMediaType: str | None = None # noqa: N815
contentEncoding: str | None = None # noqa: N815

# JSON Schema extensions
x_sql_datatype: str | None = None

def to_dict(self) -> dict[str, t.Any]:
"""Return the raw JSON Schema as a (possibly nested) dict.
Expand All @@ -99,12 +104,14 @@ def to_dict(self) -> dict[str, t.Any]:
result["items"] = self.items.to_dict()

for key in STANDARD_KEYS:
if self.__dict__.get(key) is not None:
result[key] = self.__dict__[key]
attr = key.replace("-", "_")
if (val := self.__dict__.get(attr)) is not None:
result[key] = val

for key in META_KEYS:
if self.__dict__.get(key) is not None:
result[f"${key}"] = self.__dict__[key]
attr = key.replace("-", "_")
if (val := self.__dict__.get(attr)) is not None:
result[f"${key}"] = val

return result

Expand Down Expand Up @@ -142,6 +149,7 @@ def from_dict(
... "description": "Age in years which must be equal to or greater than zero.",
... "type": "integer",
... "minimum": 0,
... "x-sql-datatype": "smallint",
... },
... },
... "required": ["firstName", "lastName"],
Expand All @@ -153,6 +161,8 @@ def from_dict(
"The person's first name."
>>> schema.properties["age"].minimum
0
>>> schema.properties["age"].x_sql_datatype
'smallint'
>>> schema.schema
'http://json-schema.org/draft/2020-12/schema'
""" # noqa: E501
Expand All @@ -168,12 +178,14 @@ def from_dict(
kwargs["items"] = cls.from_dict(items, **schema_defaults)

for key in STANDARD_KEYS:
attr = key.replace("-", "_")
if key in data:
kwargs[key] = data[key]
kwargs[attr] = data[key]

for key in META_KEYS:
attr = key.replace("-", "_")
if f"${key}" in data:
kwargs[key] = data[f"${key}"]
kwargs[attr] = data[f"${key}"]

return cls(**kwargs)

Expand Down
43 changes: 39 additions & 4 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ class JSONSchemaToSQL:
This class provides a mapping from JSON Schema types to SQLAlchemy types.
.. versionadded:: 0.42.0
.. versionchanged:: 0.44.0
Added the
:meth:`singer_sdk.connectors.sql.JSONSchemaToSQL.register_sql_datatype_handler`
method to map custom ``x-sql-datatype`` annotations into SQLAlchemy types.
"""

def __init__(self, *, max_varchar_length: int | None = None) -> None:
Expand Down Expand Up @@ -276,6 +280,8 @@ def __init__(self, *, max_varchar_length: int | None = None) -> None:
"ipv6": lambda _: sa.types.VARCHAR(45),
}

self._sql_datatype_mapping: dict[str, JSONtoSQLHandler] = {}

self._fallback_type: type[sa.types.TypeEngine] = sa.types.VARCHAR

def _invoke_handler( # noqa: PLR6301
Expand Down Expand Up @@ -338,6 +344,25 @@ def register_format_handler(
""" # noqa: E501
self._format_handlers[format_name] = handler

def register_sql_datatype_handler(
self,
sql_datatype: str,
handler: JSONtoSQLHandler,
) -> None:
"""Register a custom ``x-sql-datatype`` handler.
Args:
sql_datatype: The x-sql-datatype string.
handler: Either a SQLAlchemy type class or a callable that takes a schema
dict and returns a SQLAlchemy type instance.
Example:
>>> from sqlalchemy.types import SMALLINT
>>> to_sql = JSONSchemaToSQL()
>>> to_sql.register_sql_datatype_handler("smallint", SMALLINT)
"""
self._sql_datatype_mapping[sql_datatype] = handler

def handle_multiple_types(self, types: t.Sequence[str]) -> sa.types.TypeEngine: # noqa: ARG002, PLR6301
"""Handle multiple types by returning a VARCHAR.
Expand Down Expand Up @@ -374,10 +399,20 @@ def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None:
Returns:
SQL type if one can be determined, None otherwise.
"""
# Check if this is a string with format first
if schema.get("type") == "string" and "format" in schema:
format_type = self._handle_format(schema)
if format_type is not None:
# Check x-sql-datatype first
if x_sql_datatype := schema.get("x-sql-datatype"):
if handler := self._sql_datatype_mapping.get(x_sql_datatype):
return self._invoke_handler(handler, schema)

warnings.warn(
f"This target does not support the x-sql-datatype '{x_sql_datatype}'",
UserWarning,
stacklevel=2,
)

# Check if this is a string with format then
if schema.get("type") == "string" and "format" in schema: # noqa: SIM102
if (format_type := self._handle_format(schema)) is not None:
return format_type

# Then check regular types
Expand Down
16 changes: 16 additions & 0 deletions tests/core/test_connector_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,22 @@ def handle_raw_string(self, schema):
result = json_schema_to_sql.to_sql_type(image_type)
assert isinstance(result, sa.types.LargeBinary)

def test_annotation_sql_datatype(self):
json_schema_to_sql = JSONSchemaToSQL()
json_schema_to_sql.register_sql_datatype_handler("json", sa.types.JSON)
jsonschema_type = {"type": ["string"], "x-sql-datatype": "json"}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.JSON)

unknown_type = {"type": ["string"], "x-sql-datatype": "unknown"}
with pytest.warns(
UserWarning,
match="This target does not support the x-sql-datatype",
):
result = json_schema_to_sql.to_sql_type(unknown_type)

assert isinstance(result, sa.types.VARCHAR)


def test_bench_discovery(benchmark, tmp_path: Path):
def _discover_catalog(connector):
Expand Down

0 comments on commit b3610ca

Please sign in to comment.