-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(data-warehouse): Build a new postgres source #28660
Conversation
Hey @Gilbert09! 👋 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR introduces significant changes to SQL data source handling and package dependencies, focusing on a new PostgreSQL implementation and memory management improvements.
- Added new
postgres_source
function insql_v2.py
usingpsycopg
library with 10k batch fetching and scrollable cursors - Implemented PyArrow memory pool management and debug logging through new
PYARROW_DEBUG_LOGGING
environment variable - Updated core database packages including psycopg (3.1.20 -> 3.2.4), SQLAlchemy (2.0.31 -> 2.0.38), and related dependencies
- Modified
TableLoader
execution options to improve memory usage withmax_row_buffer
andstream_results
parameters - Introduced
SourceResponse
dataclass to standardize data source operation responses
Note: Several issues need addressing before merge, including commented-out null column handling, debug print statements, and missing error handling in the new PostgreSQL implementation.
15 file(s) reviewed, 12 comment(s)
Edit PR Review Bot Settings | Greptile
# TODO !!!!! | ||
# pa_table = _handle_null_columns_with_definitions(pa_table, self._resource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: This TODO needs to be resolved before merging. Removing null column handling without replacement could cause data integrity issues.
@dataclasses.dataclass | ||
class SourceResponse: | ||
name: str | ||
items: Iterable[Any] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Using Any
for items type parameter reduces type safety. Consider using a more specific type or generic type parameter if possible.
try: | ||
if len(table_data) == 0: | ||
return pa.Table.from_pylist(table_data) | ||
|
||
uuid_exists = any(isinstance(value, uuid.UUID) for value in table_data[0].values()) | ||
if uuid_exists: | ||
return pa.Table.from_pylist(_convert_uuid_to_string(table_data)) | ||
|
||
return pa.Table.from_pylist(table_data) | ||
except: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Removing empty list check means empty tables will hit the exception handler unnecessarily. Consider keeping the optimization.
print("===================") # noqa: T201 | ||
print("USING NEW SOURCE!!!") # noqa: T201 | ||
print("===================") # noqa: T201 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Debug print statements should be removed before production
def get_rows() -> Iterator[Any]: | ||
with psycopg.connect( | ||
f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode={sslmode}" | ||
) as connection: | ||
with connection.cursor(name=f"posthog_{team_id}_{table_name}", scrollable=True) as cursor: | ||
cursor.itersize = 10_000 | ||
|
||
query = sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name)) | ||
cursor.execute(query) | ||
|
||
column_names = [column.name for column in cursor.description or []] | ||
|
||
while True: | ||
rows = cursor.fetchmany(10_000) | ||
if not rows: | ||
break | ||
|
||
yield table_from_py_list([dict(zip(column_names, row)) for row in rows]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: No error handling for database connection failures or query execution errors. Should wrap in try/catch
def get_rows() -> Iterator[Any]: | |
with psycopg.connect( | |
f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode={sslmode}" | |
) as connection: | |
with connection.cursor(name=f"posthog_{team_id}_{table_name}", scrollable=True) as cursor: | |
cursor.itersize = 10_000 | |
query = sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name)) | |
cursor.execute(query) | |
column_names = [column.name for column in cursor.description or []] | |
while True: | |
rows = cursor.fetchmany(10_000) | |
if not rows: | |
break | |
yield table_from_py_list([dict(zip(column_names, row)) for row in rows]) | |
def get_rows() -> Iterator[Any]: | |
try: | |
with psycopg.connect( | |
f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode={sslmode}" | |
) as connection: | |
with connection.cursor(name=f"posthog_{team_id}_{table_name}", scrollable=True) as cursor: | |
cursor.itersize = 10_000 | |
query = sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name)) | |
cursor.execute(query) | |
column_names = [column.name for column in cursor.description or []] | |
while True: | |
rows = cursor.fetchmany(10_000) | |
if not rows: | |
break | |
yield table_from_py_list([dict(zip(column_names, row)) for row in rows]) | |
except psycopg.Error as e: | |
raise RuntimeError(f"Database error: {str(e)}") from e |
db_incremental_field_last_value: Optional[Any], | ||
using_ssl: Optional[bool] = True, | ||
team_id: Optional[int] = None, | ||
incremental_field: Optional[str] = None, | ||
incremental_field_type: Optional[IncrementalFieldType] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Incremental field parameters are passed in but never used in the query logic
@@ -1,12 +1,19 @@ | |||
import asyncio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non blocking but seems like a bunch of this should be tested eventually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% - planning on adding separate tests for all of this soon
seconds_column = pa.array( | ||
[row.as_py().total_seconds() if row.as_py() is not None else None for row in column] | ||
) | ||
table = table.set_column(table.schema.get_field_index(column_name), column_name, seconds_column) | ||
column = table.column(column_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Every row.as_py()
call allocates memory for the python type (a timedelta
, I assume in this case). We could save some memory here by using pyarrow operations:
seconds_column = pa.array( | |
[row.as_py().total_seconds() if row.as_py() is not None else None for row in column] | |
) | |
table = table.set_column(table.schema.get_field_index(column_name), column_name, seconds_column) | |
column = table.column(column_name) | |
if column.unit == "s": | |
factor = 1 | |
elif column.unit == "ms": | |
factor = 1_000 | |
elif column.unit == "us": | |
factor = 1_000_000 | |
elif column.unit == "ns": | |
factor = 1_000_000_000 | |
else: | |
# Should never get here as we have covered all possible units. | |
# But there is no way to assert this at compile time. | |
# See for possible units: https://arrow.apache.org/docs/python/generated/pyarrow.duration.html | |
raise ValueError(f"Invalid unit: {column.unit}") | |
# This gets us an Int64 array, which is the same as what we would get | |
# by creating an array using Python's int. | |
second_column = pc.multiply(column.cast(pa.int64()), factor) | |
table = table.set_column(table.schema.get_field_index(column_name), column_name, seconds_column) | |
column = table.column(column_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great knowledge - thank you
return table_from_iterator(iter(table_data)) | ||
|
||
|
||
def _process_batch(table_data: list[dict], schema: Optional[pa.Schema] = None) -> pa.Table: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would be nice if we could refactor this to work with an iterator thus saving the materialization in the caller. But It's probably a lengthy refactor that could be done later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it was initially meant to take an iterator - but some things started to seem very hard to do, one for later for sure
while True: | ||
rows = cursor.fetchmany(10_000) | ||
if not rows: | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Just an alternative that I think would be cleaner:
while True: | |
rows = cursor.fetchmany(10_000) | |
if not rows: | |
break | |
cursor.arraysize = 10_000 | |
for rows in cursor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would have to test it, unsure if it works (but I think it should).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few nits, feel free to ignore. Work looks good!
Problem
Changes
sql_database/arrow_helpers.py
and other logic inpipeline/utils.py
sql_database/*.py
to be more central for sources to useSourceResponse
which is the beginning of some work I'm doing to abstract out how sources are built to be more centralised in a single place - this also helps us move away fromdlt
helpers tooDoes this work well for both Cloud and self-hosted?
Yes
How did you test this code?
Unit tests are all passing for this new postgres source