Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure SQL streams are sorted when a replication key is set #1951

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions samples/sample_tap_sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class SQLiteStream(SQLStream):

connector_class = SQLiteConnector

# Use a smaller state message frequency to check intermediate state.
STATE_MSG_FREQUENCY = 10


class SQLiteTap(SQLTap):
"""The Tap class for SQLite."""
Expand Down
12 changes: 12 additions & 0 deletions singer_sdk/streams/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,17 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
continue
yield transformed_record

@property
def is_sorted(self) -> bool:
"""Expect stream to be sorted.

When `True`, incremental streams will attempt to resume if unexpectedly
interrupted.

Returns:
`True` if stream is sorted. Defaults to `False`.
"""
return self.replication_key is not None


__all__ = ["SQLStream", "SQLConnector"]
20 changes: 13 additions & 7 deletions tests/samples/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ def _sqlite_sample_db(sqlite_connector):


@pytest.fixture
def sqlite_sample_tap(
_sqlite_sample_db,
sqlite_sample_db_config,
sqlite_sample_db_state,
) -> SQLiteTap:
_ = _sqlite_sample_db
def sqlite_sample_db_catalog(sqlite_sample_db_config) -> Catalog:
catalog_obj = Catalog.from_dict(
_get_tap_catalog(SQLiteTap, config=sqlite_sample_db_config, select_all=True),
)
Expand All @@ -55,9 +50,20 @@ def sqlite_sample_tap(
t2.key_properties = ["c1"]
t2.replication_key = "c1"
t2.replication_method = "INCREMENTAL"
return catalog_obj


@pytest.fixture
def sqlite_sample_tap(
_sqlite_sample_db,
sqlite_sample_db_config,
sqlite_sample_db_state,
sqlite_sample_db_catalog,
) -> SQLiteTap:
_ = _sqlite_sample_db
return SQLiteTap(
config=sqlite_sample_db_config,
catalog=catalog_obj.to_dict(),
catalog=sqlite_sample_db_catalog.to_dict(),
state=sqlite_sample_db_state,
)

Expand Down
24 changes: 24 additions & 0 deletions tests/samples/test_tap_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import json
import typing as t

import pytest
from click.testing import CliRunner
from freezegun import freeze_time

from samples.sample_tap_sqlite import SQLiteTap
from samples.sample_target_csv.csv_target import SampleTargetCSV
from singer_sdk import SQLStream
from singer_sdk._singerlib import MetadataMapping, StreamMetadata
from singer_sdk.testing import (
get_standard_tap_tests,
tap_sync_test,
tap_to_target_sync_test,
)

Expand Down Expand Up @@ -116,3 +119,24 @@ def test_sync_sqlite_to_csv(sqlite_sample_tap: SQLTap, tmp_path: Path):
sqlite_sample_tap,
SampleTargetCSV(config={"target_folder": f"{tmp_path}/"}),
)


@pytest.fixture
@freeze_time("2022-01-01T00:00:00Z")
def sqlite_sample_tap_state_messages(sqlite_sample_tap: SQLTap) -> list[dict]:
stdout, _ = tap_sync_test(sqlite_sample_tap)
state_messages = []
for line in stdout.readlines():
message = json.loads(line)
if message["type"] == "STATE":
state_messages.append(message)

return state_messages


def test_sqlite_state(sqlite_sample_tap_state_messages):
assert all(
"progress_markers" not in bookmark
for message in sqlite_sample_tap_state_messages
for bookmark in message["value"]["bookmarks"].values()
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
)
9 changes: 6 additions & 3 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
from samples.sample_target_sqlite import SQLiteSink, SQLiteTarget
from singer_sdk import typing as th
from singer_sdk.testing import (
_get_tap_catalog,
tap_sync_test,
tap_to_target_sync_test,
target_sync_test,
)

if t.TYPE_CHECKING:
from singer_sdk._singerlib import Catalog
from singer_sdk.tap_base import SQLTap
from singer_sdk.target_base import SQLTarget

Expand Down Expand Up @@ -67,6 +67,7 @@ def sqlite_sample_target_batch(sqlite_target_test_config):
def test_sync_sqlite_to_sqlite(
sqlite_sample_tap: SQLTap,
sqlite_sample_target: SQLTarget,
sqlite_sample_db_catalog: Catalog,
):
"""End-to-end-to-end test for SQLite tap and target.

Expand All @@ -84,8 +85,10 @@ def test_sync_sqlite_to_sqlite(
)
orig_stdout.seek(0)
tapped_config = dict(sqlite_sample_target.config)
catalog = _get_tap_catalog(SQLiteTap, config=tapped_config, select_all=True)
tapped_target = SQLiteTap(config=tapped_config, catalog=catalog)
tapped_target = SQLiteTap(
config=tapped_config,
catalog=sqlite_sample_db_catalog.to_dict(),
)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
new_stdout, _ = tap_sync_test(tapped_target)

orig_stdout.seek(0)
Expand Down