Skip to content

Commit

Permalink
Test other branch in condition
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jul 15, 2022
1 parent d675a59 commit d586071
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 11 deletions.
15 changes: 5 additions & 10 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,18 +322,13 @@ def _write_starting_replication_value(self, context: Optional[dict]) -> None:

# Use start_date if it is more recent than the replication_key state
if "start_date" in self.config:
start_date_value = self.config["start_date"]
if not value:
value = self.config["start_date"]
value = start_date_value
elif self.is_timestamp_replication_key:
value_datetime = cast(datetime.datetime, pendulum.parse(value))
start_datetime = cast(
datetime.datetime, pendulum.parse(self.config["start_date"])
)
value = (
value
if value_datetime > start_datetime
else self.config["start_date"]
)
value = max(value, start_date_value, key=pendulum.parse)
else:
value = max(value, start_date_value)

write_starting_replication_value(state, value)

Expand Down
87 changes: 86 additions & 1 deletion tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
yield {"id": 3, "value": "India"}


class UnixTimestampIncrementalStream(SimpleTestStream):
name = "unix_ts"
schema = PropertiesList(
Property("id", IntegerType, required=True),
Property("value", StringType, required=True),
Property("updatedAt", IntegerType, required=True),
).to_dict()
replication_key = "updatedAt"


class RestTestStream(RESTStream):
"""Test RESTful stream class."""

Expand Down Expand Up @@ -85,6 +95,17 @@ def discover_streams(self) -> List[Stream]:
return [SimpleTestStream(self)]


class UnixTimestampTap(Tap):
"""Test tap class."""

name = "test-tap"
settings_jsonschema = PropertiesList(Property("start_date", IntegerType)).to_dict()

def discover_streams(self) -> List[Stream]:
"""List all streams."""
return [UnixTimestampIncrementalStream(self)]


@pytest.fixture
def tap() -> SimpleTestTap:
"""Tap instance."""
Expand All @@ -107,12 +128,40 @@ def tap() -> SimpleTestTap:
)


@pytest.fixture
def unix_tap() -> UnixTimestampTap:
"""Tap instance."""
catalog_dict = {
"streams": [
{
"key_properties": ["id"],
"tap_stream_id": UnixTimestampIncrementalStream.name,
"stream": UnixTimestampIncrementalStream.name,
"schema": UnixTimestampIncrementalStream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
return UnixTimestampTap(
config={"start_date": "1640991660"},
parse_env_config=False,
catalog=catalog_dict,
)


@pytest.fixture
def stream(tap: SimpleTestTap) -> SimpleTestStream:
"""Create a new stream instance."""
return cast(SimpleTestStream, tap.load_streams()[0])


@pytest.fixture
def unix_timestamp_stream(unix_tap: UnixTimestampTap) -> UnixTimestampIncrementalStream:
"""Create a new stream instance."""
return cast(UnixTimestampIncrementalStream, unix_tap.load_streams()[0])


def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
"""Applying a catalog to a stream should overwrite fields."""
assert stream.primary_keys == []
Expand All @@ -129,7 +178,12 @@ def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream):
def test_stream_starting_timestamp(
tap: SimpleTestTap,
stream: SimpleTestStream,
unix_tap: UnixTimestampTap,
unix_timestamp_stream: UnixTimestampIncrementalStream,
):
"""Validate state and start_time setting handling."""
timestamp_value = "2021-02-01"

Expand Down Expand Up @@ -173,6 +227,37 @@ def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream)
stream.config.get("start_date")
)

timestamp_value = "2030-01-01"
tap.load_state(
{
"bookmarks": {
stream.name: {
"replication_key": stream.replication_key,
"replication_key_value": timestamp_value,
}
}
}
)
stream._write_starting_replication_value(None)
assert stream.get_starting_timestamp(None) == pendulum.parse(timestamp_value)

timestamp_value = "1640991600"
unix_tap.load_state(
{
"bookmarks": {
unix_timestamp_stream.name: {
"replication_key": unix_timestamp_stream.replication_key,
"replication_key_value": timestamp_value,
}
}
}
)
unix_timestamp_stream._write_starting_replication_value(None)
assert (
unix_timestamp_stream.get_starting_replication_key_value(None)
== unix_tap.config["start_date"]
)


@pytest.mark.parametrize(
"path,content,result",
Expand Down

0 comments on commit d586071

Please sign in to comment.