diff --git a/samples/sample_tap_sqlite/__init__.py b/samples/sample_tap_sqlite/__init__.py index 908c4f57a..49b4365f0 100644 --- a/samples/sample_tap_sqlite/__init__.py +++ b/samples/sample_tap_sqlite/__init__.py @@ -34,6 +34,9 @@ class SQLiteStream(SQLStream): connector_class = SQLiteConnector + # Use a smaller state message frequency to check intermediate state. + STATE_MSG_FREQUENCY = 10 + class SQLiteTap(SQLTap): """The Tap class for SQLite.""" diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index d5fb52219..18d2d8862 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -210,5 +210,17 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: continue yield transformed_record + @property + def is_sorted(self) -> bool: + """Expect stream to be sorted. + + When `True`, incremental streams will attempt to resume if unexpectedly + interrupted. + + Returns: + `True` if stream is sorted. Defaults to `False`. + """ + return self.replication_key is not None + __all__ = ["SQLStream", "SQLConnector"] diff --git a/tests/samples/conftest.py b/tests/samples/conftest.py index 29560a330..c1467d791 100644 --- a/tests/samples/conftest.py +++ b/tests/samples/conftest.py @@ -34,12 +34,7 @@ def _sqlite_sample_db(sqlite_connector): @pytest.fixture -def sqlite_sample_tap( - _sqlite_sample_db, - sqlite_sample_db_config, - sqlite_sample_db_state, -) -> SQLiteTap: - _ = _sqlite_sample_db +def sqlite_sample_db_catalog(sqlite_sample_db_config) -> Catalog: catalog_obj = Catalog.from_dict( _get_tap_catalog(SQLiteTap, config=sqlite_sample_db_config, select_all=True), ) @@ -55,9 +50,20 @@ def sqlite_sample_tap( t2.key_properties = ["c1"] t2.replication_key = "c1" t2.replication_method = "INCREMENTAL" + return catalog_obj + + +@pytest.fixture +def sqlite_sample_tap( + _sqlite_sample_db, + sqlite_sample_db_config, + sqlite_sample_db_state, + sqlite_sample_db_catalog, +) -> SQLiteTap: + _ = _sqlite_sample_db return SQLiteTap( config=sqlite_sample_db_config, - catalog=catalog_obj.to_dict(), + catalog=sqlite_sample_db_catalog.to_dict(), state=sqlite_sample_db_state, ) diff --git a/tests/samples/test_tap_sqlite.py b/tests/samples/test_tap_sqlite.py index e2b1940da..e1cbbe152 100644 --- a/tests/samples/test_tap_sqlite.py +++ b/tests/samples/test_tap_sqlite.py @@ -3,7 +3,9 @@ import json import typing as t +import pytest from click.testing import CliRunner +from freezegun import freeze_time from samples.sample_tap_sqlite import SQLiteTap from samples.sample_target_csv.csv_target import SampleTargetCSV @@ -11,6 +13,7 @@ from singer_sdk._singerlib import MetadataMapping, StreamMetadata from singer_sdk.testing import ( get_standard_tap_tests, + tap_sync_test, tap_to_target_sync_test, ) @@ -116,3 +119,24 @@ def test_sync_sqlite_to_csv(sqlite_sample_tap: SQLTap, tmp_path: Path): sqlite_sample_tap, SampleTargetCSV(config={"target_folder": f"{tmp_path}/"}), ) + + +@pytest.fixture +@freeze_time("2022-01-01T00:00:00Z") +def sqlite_sample_tap_state_messages(sqlite_sample_tap: SQLTap) -> list[dict]: + stdout, _ = tap_sync_test(sqlite_sample_tap) + state_messages = [] + for line in stdout.readlines(): + message = json.loads(line) + if message["type"] == "STATE": + state_messages.append(message) + + return state_messages + + +def test_sqlite_state(sqlite_sample_tap_state_messages): + assert all( + "progress_markers" not in bookmark + for message in sqlite_sample_tap_state_messages + for bookmark in message["value"]["bookmarks"].values() + ) diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index 995cb03aa..727b760ba 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -19,13 +19,13 @@ from samples.sample_target_sqlite import SQLiteSink, SQLiteTarget from singer_sdk import typing as th from singer_sdk.testing import ( - _get_tap_catalog, tap_sync_test, tap_to_target_sync_test, target_sync_test, ) if t.TYPE_CHECKING: + from singer_sdk._singerlib import Catalog from singer_sdk.tap_base import SQLTap from singer_sdk.target_base import SQLTarget @@ -67,6 +67,7 @@ def sqlite_sample_target_batch(sqlite_target_test_config): def test_sync_sqlite_to_sqlite( sqlite_sample_tap: SQLTap, sqlite_sample_target: SQLTarget, + sqlite_sample_db_catalog: Catalog, ): """End-to-end-to-end test for SQLite tap and target. @@ -84,8 +85,10 @@ def test_sync_sqlite_to_sqlite( ) orig_stdout.seek(0) tapped_config = dict(sqlite_sample_target.config) - catalog = _get_tap_catalog(SQLiteTap, config=tapped_config, select_all=True) - tapped_target = SQLiteTap(config=tapped_config, catalog=catalog) + tapped_target = SQLiteTap( + config=tapped_config, + catalog=sqlite_sample_db_catalog.to_dict(), + ) new_stdout, _ = tap_sync_test(tapped_target) orig_stdout.seek(0)