Skip to content

Commit

Permalink
fix: Ensure SQL streams are sorted when a replication key is set (#1951)
Browse files Browse the repository at this point in the history
* Add failing test

* Treat as sorted if replication key is set
  • Loading branch information
edgarrmondragon authored Sep 13, 2023
1 parent 10b61d2 commit 07198b0
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 10 deletions.
3 changes: 3 additions & 0 deletions samples/sample_tap_sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class SQLiteStream(SQLStream):

connector_class = SQLiteConnector

# Use a smaller state message frequency to check intermediate state.
STATE_MSG_FREQUENCY = 10


class SQLiteTap(SQLTap):
"""The Tap class for SQLite."""
Expand Down
12 changes: 12 additions & 0 deletions singer_sdk/streams/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,17 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
continue
yield transformed_record

@property
def is_sorted(self) -> bool:
"""Expect stream to be sorted.
When `True`, incremental streams will attempt to resume if unexpectedly
interrupted.
Returns:
`True` if stream is sorted. Defaults to `False`.
"""
return self.replication_key is not None


__all__ = ["SQLStream", "SQLConnector"]
20 changes: 13 additions & 7 deletions tests/samples/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ def _sqlite_sample_db(sqlite_connector):


@pytest.fixture
def sqlite_sample_tap(
_sqlite_sample_db,
sqlite_sample_db_config,
sqlite_sample_db_state,
) -> SQLiteTap:
_ = _sqlite_sample_db
def sqlite_sample_db_catalog(sqlite_sample_db_config) -> Catalog:
catalog_obj = Catalog.from_dict(
_get_tap_catalog(SQLiteTap, config=sqlite_sample_db_config, select_all=True),
)
Expand All @@ -55,9 +50,20 @@ def sqlite_sample_tap(
t2.key_properties = ["c1"]
t2.replication_key = "c1"
t2.replication_method = "INCREMENTAL"
return catalog_obj


@pytest.fixture
def sqlite_sample_tap(
_sqlite_sample_db,
sqlite_sample_db_config,
sqlite_sample_db_state,
sqlite_sample_db_catalog,
) -> SQLiteTap:
_ = _sqlite_sample_db
return SQLiteTap(
config=sqlite_sample_db_config,
catalog=catalog_obj.to_dict(),
catalog=sqlite_sample_db_catalog.to_dict(),
state=sqlite_sample_db_state,
)

Expand Down
24 changes: 24 additions & 0 deletions tests/samples/test_tap_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import json
import typing as t

import pytest
from click.testing import CliRunner
from freezegun import freeze_time

from samples.sample_tap_sqlite import SQLiteTap
from samples.sample_target_csv.csv_target import SampleTargetCSV
from singer_sdk import SQLStream
from singer_sdk._singerlib import MetadataMapping, StreamMetadata
from singer_sdk.testing import (
get_standard_tap_tests,
tap_sync_test,
tap_to_target_sync_test,
)

Expand Down Expand Up @@ -116,3 +119,24 @@ def test_sync_sqlite_to_csv(sqlite_sample_tap: SQLTap, tmp_path: Path):
sqlite_sample_tap,
SampleTargetCSV(config={"target_folder": f"{tmp_path}/"}),
)


@pytest.fixture
@freeze_time("2022-01-01T00:00:00Z")
def sqlite_sample_tap_state_messages(sqlite_sample_tap: SQLTap) -> list[dict]:
stdout, _ = tap_sync_test(sqlite_sample_tap)
state_messages = []
for line in stdout.readlines():
message = json.loads(line)
if message["type"] == "STATE":
state_messages.append(message)

return state_messages


def test_sqlite_state(sqlite_sample_tap_state_messages):
assert all(
"progress_markers" not in bookmark
for message in sqlite_sample_tap_state_messages
for bookmark in message["value"]["bookmarks"].values()
)
9 changes: 6 additions & 3 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
from samples.sample_target_sqlite import SQLiteSink, SQLiteTarget
from singer_sdk import typing as th
from singer_sdk.testing import (
_get_tap_catalog,
tap_sync_test,
tap_to_target_sync_test,
target_sync_test,
)

if t.TYPE_CHECKING:
from singer_sdk._singerlib import Catalog
from singer_sdk.tap_base import SQLTap
from singer_sdk.target_base import SQLTarget

Expand Down Expand Up @@ -67,6 +67,7 @@ def sqlite_sample_target_batch(sqlite_target_test_config):
def test_sync_sqlite_to_sqlite(
sqlite_sample_tap: SQLTap,
sqlite_sample_target: SQLTarget,
sqlite_sample_db_catalog: Catalog,
):
"""End-to-end-to-end test for SQLite tap and target.
Expand All @@ -84,8 +85,10 @@ def test_sync_sqlite_to_sqlite(
)
orig_stdout.seek(0)
tapped_config = dict(sqlite_sample_target.config)
catalog = _get_tap_catalog(SQLiteTap, config=tapped_config, select_all=True)
tapped_target = SQLiteTap(config=tapped_config, catalog=catalog)
tapped_target = SQLiteTap(
config=tapped_config,
catalog=sqlite_sample_db_catalog.to_dict(),
)
new_stdout, _ = tap_sync_test(tapped_target)

orig_stdout.seek(0)
Expand Down

0 comments on commit 07198b0

Please sign in to comment.