From 43e176940b266c3c84a3fa071571e2e04b3919fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Fri, 15 Dec 2023 18:07:47 -0600 Subject: [PATCH] fix(taps): Respect forced replication method when retrieving state --- poetry.lock | 2 +- pyproject.toml | 2 +- singer_sdk/streams/core.py | 6 +++++- tests/core/conftest.py | 13 ++++++++++- tests/core/test_streams.py | 44 +++++++++++++++++++++++++++----------- 5 files changed, 50 insertions(+), 17 deletions(-) diff --git a/poetry.lock b/poetry.lock index bac536d787..0362583a4d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3044,4 +3044,4 @@ testing = ["pytest", "pytest-durations"] [metadata] lock-version = "2.0" python-versions = ">=3.7.1" -content-hash = "da331fe462131cb8d6eed319a54142818657bc4ea80f0e61ff6a1dcbcb09a47e" +content-hash = "ca6c5bd75caa638f190baf7b7b2ae1a93dabe382f0e3c4aa9605f1bc660250fe" diff --git a/pyproject.toml b/pyproject.toml index 7e5de9ba98..8d87b98068 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,7 +69,7 @@ requests = ">=2.25.1" simpleeval = ">=0.9.13" simplejson = ">=3.17.6" sqlalchemy = ">=1.4,<3.0" -typing-extensions = ">=4.2.0" +typing-extensions = ">=4.5.0" # urllib3 2.0 is not compatible with botocore urllib3 = ">=1.26,<2" diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index d235987a76..e76d19d805 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -246,7 +246,11 @@ def get_starting_replication_key_value( """ state = self.get_context_state(context) - return get_starting_replication_value(state) + return ( + get_starting_replication_value(state) + if self.replication_method != REPLICATION_FULL_TABLE + else None + ) def get_starting_timestamp(self, context: dict | None) -> datetime.datetime | None: """Get starting replication timestamp. diff --git a/tests/core/conftest.py b/tests/core/conftest.py index 06355ccfef..c91cd92bde 100644 --- a/tests/core/conftest.py +++ b/tests/core/conftest.py @@ -3,9 +3,11 @@ from __future__ import annotations import typing as t +from contextlib import contextmanager import pendulum import pytest +from typing_extensions import override from singer_sdk import Stream, Tap from singer_sdk.typing import ( @@ -32,15 +34,24 @@ def __init__(self, tap: Tap): """Create a new stream.""" super().__init__(tap, schema=self.schema, name=self.name) + @override def get_records( self, - context: dict | None, # noqa: ARG002 + context: dict | None, ) -> t.Iterable[dict[str, t.Any]]: """Generate records.""" yield {"id": 1, "value": "Egypt"} yield {"id": 2, "value": "Germany"} yield {"id": 3, "value": "India"} + @contextmanager + def with_replication_method(self, method: str | None) -> t.Iterator[None]: + """Context manager to temporarily override the replication method.""" + original_method = self.forced_replication_method + self.forced_replication_method = method + yield + self.forced_replication_method = original_method + class UnixTimestampIncrementalStream(SimpleTestStream): name = "unix_ts" diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index 8a415e55d9..f3d9aba84f 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -46,19 +46,18 @@ def get_next_page_token( response: requests.Response, previous_token: str | None, # noqa: ARG002 ) -> str | None: - if self.next_page_token_jsonpath: - all_matches = extract_jsonpath( - self.next_page_token_jsonpath, - response.json(), - ) - try: - return first(all_matches) - except StopIteration: - return None - - else: + if not self.next_page_token_jsonpath: return response.headers.get("X-Next-Page", None) + all_matches = extract_jsonpath( + self.next_page_token_jsonpath, + response.json(), + ) + try: + return first(all_matches) + except StopIteration: + return None + class GraphqlTestStream(GraphQLStream): """Test Graphql stream class.""" @@ -111,22 +110,32 @@ def test_stream_apply_catalog(stream: Stream): @pytest.mark.parametrize( - "stream_name,bookmark_value,expected_starting_value", + "stream_name,forced_replication_method,bookmark_value,expected_starting_value", [ pytest.param( "test", None, + None, pendulum.parse(CONFIG_START_DATE), id="datetime-repl-key-no-state", ), pytest.param( "test", + None, "2021-02-01", pendulum.datetime(2021, 2, 1), id="datetime-repl-key-recent-bookmark", ), pytest.param( "test", + REPLICATION_FULL_TABLE, + "2021-02-01", + None, + id="datetime-forced-full-table", + ), + pytest.param( + "test", + None, "2020-01-01", pendulum.parse(CONFIG_START_DATE), id="datetime-repl-key-old-bookmark", @@ -134,17 +143,20 @@ def test_stream_apply_catalog(stream: Stream): pytest.param( "unix_ts", None, + None, CONFIG_START_DATE, id="naive-unix-ts-repl-key-no-state", ), pytest.param( "unix_ts", + None, "1612137600", "1612137600", id="naive-unix-ts-repl-key-recent-bookmark", ), pytest.param( "unix_ts", + None, "1577858400", "1577858400", id="naive-unix-ts-repl-key-old-bookmark", @@ -152,17 +164,20 @@ def test_stream_apply_catalog(stream: Stream): pytest.param( "unix_ts_override", None, + None, CONFIG_START_DATE, id="unix-ts-repl-key-no-state", ), pytest.param( "unix_ts_override", + None, "1612137600", "1612137600", id="unix-ts-repl-key-recent-bookmark", ), pytest.param( "unix_ts_override", + None, "1577858400", pendulum.parse(CONFIG_START_DATE).format("X"), id="unix-ts-repl-key-old-bookmark", @@ -172,6 +187,7 @@ def test_stream_apply_catalog(stream: Stream): def test_stream_starting_timestamp( tap: Tap, stream_name: str, + forced_replication_method: str | None, bookmark_value: str, expected_starting_value: t.Any, ): @@ -194,7 +210,9 @@ def test_stream_starting_timestamp( }, ) stream._write_starting_replication_value(None) - assert get_starting_value(None) == expected_starting_value + + with stream.with_replication_method(forced_replication_method): + assert get_starting_value(None) == expected_starting_value def test_stream_invalid_replication_key(tap: SimpleTestTap):