Skip to content

Commit

Permalink
feat: Added an extensible API for stream schema sources
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Feb 7, 2025
1 parent 61a0b88 commit 282ad28
Show file tree
Hide file tree
Showing 6 changed files with 9,044 additions and 68 deletions.
13 changes: 7 additions & 6 deletions samples/sample_tap_gitlab/gitlab_rest_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from singer_sdk.authenticators import SimpleAuthenticator
from singer_sdk.pagination import SimpleHeaderPaginator
from singer_sdk.schema import LocalSchemaSource
from singer_sdk.streams.rest import RESTStream
from singer_sdk.typing import (
ArrayType,
Expand All @@ -19,7 +20,7 @@
StringType,
)

SCHEMAS_DIR = importlib.resources.files(__package__) / "schemas"
LOCAL_SCHEMAS = LocalSchemaSource(importlib.resources.files(__package__) / "schemas")

DEFAULT_URL_BASE = "https://gitlab.com/api/v4"

Expand Down Expand Up @@ -103,7 +104,7 @@ class ProjectsStream(ProjectBasedStream):
primary_keys = ("id",)
replication_key = "last_activity_at"
is_sorted = True
schema_filepath = SCHEMAS_DIR / "projects.json"
schema = LOCAL_SCHEMAS("projects")


class ReleasesStream(ProjectBasedStream):
Expand All @@ -113,7 +114,7 @@ class ReleasesStream(ProjectBasedStream):
path = "/projects/{project_id}/releases"
primary_keys = ("project_id", "tag_name")
replication_key = None
schema_filepath = SCHEMAS_DIR / "releases.json"
schema = LOCAL_SCHEMAS("releases")


class IssuesStream(ProjectBasedStream):
Expand All @@ -124,7 +125,7 @@ class IssuesStream(ProjectBasedStream):
primary_keys = ("id",)
replication_key = "updated_at"
is_sorted = False
schema_filepath = SCHEMAS_DIR / "issues.json"
schema = LOCAL_SCHEMAS("issues")


class CommitsStream(ProjectBasedStream):
Expand All @@ -137,7 +138,7 @@ class CommitsStream(ProjectBasedStream):
primary_keys = ("id",)
replication_key = "created_at"
is_sorted = False
schema_filepath = SCHEMAS_DIR / "commits.json"
schema = LOCAL_SCHEMAS("commits")


class EpicsStream(ProjectBasedStream):
Expand Down Expand Up @@ -202,7 +203,7 @@ class EpicIssuesStream(GitlabStream):
path = "/groups/{group_id}/epics/{epic_iid}/issues"
primary_keys = ("id",)
replication_key = None
schema_filepath = SCHEMAS_DIR / "epic_issues.json"
schema = LOCAL_SCHEMAS("epic_issues")
parent_stream_type = EpicsStream # Stream should wait for parents to complete.

def get_url_params(
Expand Down
108 changes: 108 additions & 0 deletions singer_sdk/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Schema sources."""

from __future__ import annotations

import functools
import json
import sys
import typing as t
from pathlib import Path

import requests

from singer_sdk._singerlib import resolve_schema_references

if sys.version_info < (3, 12):
from importlib.abc import Traversable
else:
from importlib.resources.abc import Traversable


class BaseSchemaSource:
"""Base schema source."""

def __init__(self) -> None:
"""Initialize the schema source."""
self._registry: dict[str, dict] = {}

def get_schema(self, *args: t.Any, **kwargs: t.Any) -> dict:
"""Get schema from reference.
Raises:
NotImplementedError: If the method is not implemented by the subclass.
"""
msg = "Subclasses must implement this method."
raise NotImplementedError(msg)

def __call__(self, *args: t.Any, **kwargs: t.Any) -> dict:
"""Get schema for the given stream name or reference.
Returns:
The schema dictionary.
"""
return self.get_schema(*args, **kwargs)


class LocalSchemaSource(BaseSchemaSource):
"""Local schema source."""

def __init__(self, path: Path | Traversable) -> None:
"""Initialize the schema source."""
super().__init__()
self.path = path

def get_schema(self, name: str) -> dict:
"""Get schema from reference.
Args:
name: Name of the stream.
Returns:
The schema dictionary.
"""
if name not in self._registry:
schema_path = self.path / f"{name}.json"
self._registry[name] = json.loads(schema_path.read_text())

return self._registry[name]


class OpenAPISchemaSource(BaseSchemaSource):
"""OpenAPI schema source."""

def __init__(self, path: str | Path | Traversable) -> None:
"""Initialize the schema source."""
super().__init__()
self.path = path

@functools.cached_property
def spec_dict(self) -> dict:
"""OpenAPI spec dictionary.
Raises:
ValueError: If the path type is not supported.
"""
if isinstance(self.path, (Path, Traversable)):
return json.loads(self.path.read_text()) # type: ignore[no-any-return]

if self.path.startswith("http"):
return requests.get(self.path, timeout=10).json() # type: ignore[no-any-return]

msg = f"Unsupported path type: {self.path}"
raise ValueError(msg)

def get_schema(self, ref: str) -> dict:
"""Get schema from reference.
Args:
ref: Reference to the schema.
Returns:
The schema dictionary.
"""
if ref not in self._registry:
schema = {"$ref": f"#/components/schemas/{ref}"}
schema["components"] = self.spec_dict["components"]
self._registry[ref] = resolve_schema_references(schema)

return self._registry[ref]
34 changes: 34 additions & 0 deletions tests/_singerlib/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,40 @@
}


def test_simple_schema():
simple_schema = {
"title": "Longitude and Latitude Values",
"description": "A geographical coordinate.",
"required": ["latitude", "longitude"],
"type": "object",
"properties": {
"latitude": {"type": "number", "minimum": -90, "maximum": 90},
"longitude": {"type": "number", "minimum": -180, "maximum": 180},
},
}

schema_plus = Schema.from_dict(simple_schema)
assert schema_plus.to_dict() == simple_schema
assert schema_plus.required == ["latitude", "longitude"]
assert isinstance(schema_plus.properties["latitude"], Schema)
latitude = schema_plus.properties["latitude"]
assert latitude.type == "number"


def test_schema_with_items():
schema = {
"description": "A representation of a person, company, organization, or place",
"type": "object",
"properties": {"fruits": {"type": "array", "items": {"type": "string"}}},
}
schema_plus = Schema.from_dict(schema)
assert schema_plus.to_dict() == schema
assert isinstance(schema_plus.properties["fruits"], Schema)
fruits = schema_plus.properties["fruits"]
assert isinstance(fruits.items, Schema)
assert fruits.items.type == "string"


@pytest.mark.parametrize(
"schema,expected",
[
Expand Down
86 changes: 24 additions & 62 deletions tests/core/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,32 @@
"""
Testing that Schema can convert schemas lossless from and to dicts.
Schemas are taken from these examples;
https://json-schema.org/learn/miscellaneous-examples.html
NOTE: The following properties are not currently supported;
pattern
unevaluatedProperties
propertyNames
minProperties
maxProperties
prefixItems
contains
minContains
maxContains
minItems
maxItems
uniqueItems
enum
const
contentMediaType
contentEncoding
allOf
oneOf
not
Some of these could be trivially added (if they are SIMPLE_PROPERTIES.
Some might need more thinking if they can contain schemas (though, note that we also
treat 'additionalProperties', 'anyOf' and' patternProperties' as SIMPLE even though they
can contain schemas.
"""
"""Test the schema sources."""

from __future__ import annotations

from singer_sdk._singerlib import Schema
import typing as t

from singer_sdk.schema import LocalSchemaSource, OpenAPISchemaSource

if t.TYPE_CHECKING:
import pytest

def test_simple_schema():
simple_schema = {
"title": "Longitude and Latitude Values",
"description": "A geographical coordinate.",
"required": ["latitude", "longitude"],
"type": "object",
"properties": {
"latitude": {"type": "number", "minimum": -90, "maximum": 90},
"longitude": {"type": "number", "minimum": -180, "maximum": 180},
},
}

schema_plus = Schema.from_dict(simple_schema)
assert schema_plus.to_dict() == simple_schema
assert schema_plus.required == ["latitude", "longitude"]
assert isinstance(schema_plus.properties["latitude"], Schema)
latitude = schema_plus.properties["latitude"]
assert latitude.type == "number"
def test_local_schema_source(pytestconfig: pytest.Config):
schema_dir = pytestconfig.rootpath / "tests/fixtures/schemas"
schema_source = LocalSchemaSource(schema_dir)
schema = schema_source("user")
assert isinstance(schema, dict)
assert schema["type"] == "object"
assert "items" not in schema
assert "properties" in schema
assert "id" in schema["properties"]


def test_schema_with_items():
schema = {
"description": "A representation of a person, company, organization, or place",
"type": "object",
"properties": {"fruits": {"type": "array", "items": {"type": "string"}}},
}
schema_plus = Schema.from_dict(schema)
assert schema_plus.to_dict() == schema
assert isinstance(schema_plus.properties["fruits"], Schema)
fruits = schema_plus.properties["fruits"]
assert isinstance(fruits.items, Schema)
assert fruits.items.type == "string"
def test_openapi_schema_source(pytestconfig: pytest.Config):
openapi_path = pytestconfig.rootpath / "tests/fixtures/openapi.json"
schema_source = OpenAPISchemaSource(openapi_path)
schema = schema_source("ProjectListItem")
assert isinstance(schema, dict)
assert schema["type"] == "object"
assert "items" not in schema
assert "properties" in schema
assert "id" in schema["properties"]
Loading

0 comments on commit 282ad28

Please sign in to comment.