From d586071e65d7ae52c43f60aa1e5abb3b5cca1dda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Fri, 15 Jul 2022 19:01:26 +0200 Subject: [PATCH] Test other branch in condition --- singer_sdk/streams/core.py | 15 +++---- tests/core/test_streams.py | 87 +++++++++++++++++++++++++++++++++++++- 2 files changed, 91 insertions(+), 11 deletions(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 3e15d921d..4ffa49e6b 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -322,18 +322,13 @@ def _write_starting_replication_value(self, context: Optional[dict]) -> None: # Use start_date if it is more recent than the replication_key state if "start_date" in self.config: + start_date_value = self.config["start_date"] if not value: - value = self.config["start_date"] + value = start_date_value elif self.is_timestamp_replication_key: - value_datetime = cast(datetime.datetime, pendulum.parse(value)) - start_datetime = cast( - datetime.datetime, pendulum.parse(self.config["start_date"]) - ) - value = ( - value - if value_datetime > start_datetime - else self.config["start_date"] - ) + value = max(value, start_date_value, key=pendulum.parse) + else: + value = max(value, start_date_value) write_starting_replication_value(state, value) diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index fd403f3ed..07c5fd1ed 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -48,6 +48,16 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: yield {"id": 3, "value": "India"} +class UnixTimestampIncrementalStream(SimpleTestStream): + name = "unix_ts" + schema = PropertiesList( + Property("id", IntegerType, required=True), + Property("value", StringType, required=True), + Property("updatedAt", IntegerType, required=True), + ).to_dict() + replication_key = "updatedAt" + + class RestTestStream(RESTStream): """Test RESTful stream class.""" @@ -85,6 +95,17 @@ def discover_streams(self) -> List[Stream]: return [SimpleTestStream(self)] +class UnixTimestampTap(Tap): + """Test tap class.""" + + name = "test-tap" + settings_jsonschema = PropertiesList(Property("start_date", IntegerType)).to_dict() + + def discover_streams(self) -> List[Stream]: + """List all streams.""" + return [UnixTimestampIncrementalStream(self)] + + @pytest.fixture def tap() -> SimpleTestTap: """Tap instance.""" @@ -107,12 +128,40 @@ def tap() -> SimpleTestTap: ) +@pytest.fixture +def unix_tap() -> UnixTimestampTap: + """Tap instance.""" + catalog_dict = { + "streams": [ + { + "key_properties": ["id"], + "tap_stream_id": UnixTimestampIncrementalStream.name, + "stream": UnixTimestampIncrementalStream.name, + "schema": UnixTimestampIncrementalStream.schema, + "replication_method": REPLICATION_FULL_TABLE, + "replication_key": None, + } + ] + } + return UnixTimestampTap( + config={"start_date": "1640991660"}, + parse_env_config=False, + catalog=catalog_dict, + ) + + @pytest.fixture def stream(tap: SimpleTestTap) -> SimpleTestStream: """Create a new stream instance.""" return cast(SimpleTestStream, tap.load_streams()[0]) +@pytest.fixture +def unix_timestamp_stream(unix_tap: UnixTimestampTap) -> UnixTimestampIncrementalStream: + """Create a new stream instance.""" + return cast(UnixTimestampIncrementalStream, unix_tap.load_streams()[0]) + + def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream): """Applying a catalog to a stream should overwrite fields.""" assert stream.primary_keys == [] @@ -129,7 +178,12 @@ def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream): assert stream.forced_replication_method == REPLICATION_FULL_TABLE -def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream): +def test_stream_starting_timestamp( + tap: SimpleTestTap, + stream: SimpleTestStream, + unix_tap: UnixTimestampTap, + unix_timestamp_stream: UnixTimestampIncrementalStream, +): """Validate state and start_time setting handling.""" timestamp_value = "2021-02-01" @@ -173,6 +227,37 @@ def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream) stream.config.get("start_date") ) + timestamp_value = "2030-01-01" + tap.load_state( + { + "bookmarks": { + stream.name: { + "replication_key": stream.replication_key, + "replication_key_value": timestamp_value, + } + } + } + ) + stream._write_starting_replication_value(None) + assert stream.get_starting_timestamp(None) == pendulum.parse(timestamp_value) + + timestamp_value = "1640991600" + unix_tap.load_state( + { + "bookmarks": { + unix_timestamp_stream.name: { + "replication_key": unix_timestamp_stream.replication_key, + "replication_key_value": timestamp_value, + } + } + } + ) + unix_timestamp_stream._write_starting_replication_value(None) + assert ( + unix_timestamp_stream.get_starting_replication_key_value(None) + == unix_tap.config["start_date"] + ) + @pytest.mark.parametrize( "path,content,result",