Skip to content

Commit

Permalink
fix(taps): Use nulls_first when available to order NULL results i…
Browse files Browse the repository at this point in the history
…n incremental SQL streams (#2094)

fix: Use `nulls_first` to order results in incremental SQL streams
  • Loading branch information
edgarrmondragon authored Dec 8, 2023
1 parent d755024 commit 5293f50
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
# Show typehints in the description, along with parameter descriptions
autodoc_typehints = "signature"
autodoc_class_signature = "separated"
autodoc_member_order = "groupwise"

# -- Options for HTML output -------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions samples/sample_tap_sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class SQLiteStream(SQLStream):
"""

connector_class = SQLiteConnector
supports_nulls_first = True

# Use a smaller state message frequency to check intermediate state.
STATE_MSG_FREQUENCY = 10
Expand Down
12 changes: 11 additions & 1 deletion singer_sdk/streams/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import abc
import typing as t

from sqlalchemy import nulls_first

import singer_sdk.helpers._catalog as catalog
from singer_sdk._singerlib import CatalogEntry, MetadataMapping
from singer_sdk.connectors import SQLConnector
Expand All @@ -20,6 +22,9 @@ class SQLStream(Stream, metaclass=abc.ABCMeta):
connector_class = SQLConnector
_cached_schema: dict | None = None

supports_nulls_first: bool = False
"""Whether the database supports the NULLS FIRST/LAST syntax."""

def __init__(
self,
tap: Tap,
Expand Down Expand Up @@ -189,7 +194,12 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:

if self.replication_key:
replication_key_col = table.columns[self.replication_key]
query = query.order_by(replication_key_col)
order_by = (
nulls_first(replication_key_col.asc())
if self.supports_nulls_first
else replication_key_col.asc()
)
query = query.order_by(order_by)

start_val = self.get_starting_replication_key_value(context)
if start_val:
Expand Down

0 comments on commit 5293f50

Please sign in to comment.