Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deprecate usage of cursor.execute statements in favor of the in class implementation of execute. #60748

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
61 changes: 41 additions & 20 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1651,10 +1651,18 @@ def run_transaction(self):

def execute(self, sql: str | Select | TextClause, params=None):
"""Simple passthrough to SQLAlchemy connectable"""
from sqlalchemy.exc import DBAPIError as SQLAlchemyDatabaseError

args = [] if params is None else [params]
if isinstance(sql, str):
return self.con.exec_driver_sql(sql, *args)
return self.con.execute(sql, *args)
execute_function = self.con.exec_driver_sql
else:
execute_function = self.con.execute

try:
return execute_function(sql, *args)
except SQLAlchemyDatabaseError as exc:
raise DatabaseError(f"Execution failed on sql '{sql}': {exc}") from exc

def read_table(
self,
Expand Down Expand Up @@ -2108,17 +2116,19 @@ def run_transaction(self):
self.con.commit()

def execute(self, sql: str | Select | TextClause, params=None):
from adbc_driver_manager import DatabaseError as ADBCDatabaseError

if not isinstance(sql, str):
raise TypeError("Query must be a string unless using sqlalchemy.")
args = [] if params is None else [params]
cur = self.con.cursor()
try:
cur.execute(sql, *args)
return cur
except Exception as exc:
except ADBCDatabaseError as exc:
try:
self.con.rollback()
except Exception as inner_exc: # pragma: no cover
except ADBCDatabaseError as inner_exc: # pragma: no cover
ex = DatabaseError(
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
)
Expand Down Expand Up @@ -2207,8 +2217,7 @@ def read_table(
else:
stmt = f"SELECT {select_list} FROM {table_name}"

with self.con.cursor() as cur:
cur.execute(stmt)
with self.execute(stmt) as cur:
pa_table = cur.fetch_arrow_table()
df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend)

Expand Down Expand Up @@ -2278,8 +2287,7 @@ def read_query(
if chunksize:
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers")

with self.con.cursor() as cur:
cur.execute(sql)
with self.execute(sql) as cur:
pa_table = cur.fetch_arrow_table()
df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend)

Expand Down Expand Up @@ -2335,6 +2343,9 @@ def to_sql(
engine : {'auto', 'sqlalchemy'}, default 'auto'
Raises NotImplementedError if not set to 'auto'
"""
from adbc_driver_manager import DatabaseError as ADBCDatabaseError
import pyarrow as pa

if index_label:
raise NotImplementedError(
"'index_label' is not implemented for ADBC drivers"
Expand Down Expand Up @@ -2364,22 +2375,25 @@ def to_sql(
if if_exists == "fail":
raise ValueError(f"Table '{table_name}' already exists.")
elif if_exists == "replace":
with self.con.cursor() as cur:
cur.execute(f"DROP TABLE {table_name}")
sql_statement = f"DROP TABLE {table_name}"
self.execute(sql_statement).close()
elif if_exists == "append":
mode = "append"

import pyarrow as pa

try:
tbl = pa.Table.from_pandas(frame, preserve_index=index)
except pa.ArrowNotImplementedError as exc:
raise ValueError("datatypes not supported") from exc

with self.con.cursor() as cur:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
try:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
except ADBCDatabaseError as exc:
raise DatabaseError(
f"Failed to insert records on table={name} with {mode=}"
) from exc

self.con.commit()
return total_inserted
Expand Down Expand Up @@ -2496,9 +2510,9 @@ def sql_schema(self) -> str:
return str(";\n".join(self.table))

def _execute_create(self) -> None:
with self.pd_sql.run_transaction() as conn:
with self.pd_sql.run_transaction():
for stmt in self.table:
conn.execute(stmt)
self.pd_sql.execute(stmt).close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the need for .close() here? I am hoping we can avoid anything that implicitly changes the transaction state

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say there's a need for that, but it is usually a good practice to close any cursor that is opened. We can remove to maintain the status quo - no problem on that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh OK I misread this as closing the transaction. Does whatever pd_sql.execute return not follow the context manager protocol? We should be preferring with statements whenever a context like that needs to be managed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, no worries. I'm super thankful for all the help and attentive comments.

Yeah, it does implement the context manager protocol. This was a personal choice and has no technical reason:

    with self.pd_sql.run_transaction():
            for stmt in self.table:
                self.pd_sql.execute(stmt).close()

Reads better to me than

    with self.pd_sql.run_transaction():
            for stmt in self.table:
                with self.pd_sql.execute(stmt):
                    pass

What you think ? :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always in favor of using the context manager over calling close manually.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Let me try to illustrate that with an example from the codebase.
We can say that the ADBCDatabase implementation is the same as of SQLiteDatabase

pandas/pandas/io/sql.py

Lines 2110 to 2128 in c0c778b

def execute(self, sql: str | Select | TextClause, params=None):
if not isinstance(sql, str):
raise TypeError("Query must be a string unless using sqlalchemy.")
args = [] if params is None else [params]
cur = self.con.cursor()
try:
cur.execute(sql, *args)
return cur
except Exception as exc:
try:
self.con.rollback()
except Exception as inner_exc: # pragma: no cover
ex = DatabaseError(
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
)
raise ex from inner_exc
ex = DatabaseError(f"Execution failed on sql '{sql}': {exc}")
raise ex from exc

Alright, so now take this line as example:

                sql_statement = f"DROP TABLE {table_name}"
                self.execute(sql_statement)

If we leave it like that ☝️ then an exception is raised because because the cursor was left open:
image

I'm no specialist here but believe that despite all differences between drivers the DBAPI is respected and if that is the case then the SQLite cursor should also be closed. The SQLite cursor will anyways get closed when __del__ is called so we might be safer if we maintain the status quo in case we're uncertain about side-effects ?

Copy link
Author

@gmcrocetti gmcrocetti Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mroeschke , @WillAyd , just a heads up regarding self.pd_sql.execute(stmt).close() vs with self.pd_sql.execute(stmt). The later is not feasible because the SQLite Cursor object does not implement the context manager protocol.
Options are calling .close() or using closing from contextlib.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, so now take this line as example:

                sql_statement = f"DROP TABLE {table_name}"
                self.execute(sql_statement)

If we leave it like that ☝️ then an exception is raised because because the cursor was left open:

Thanks for this example. So previously we were explicitly opening and closing a cursor but with the switch to using self.execute we are re-using the cursor attached to the class instance and not controlling its lifecycle.

So how is that lifecycle being managed? Seems like there is just a gap / inconsistency that is making this all more complicated than it should be

Copy link
Author

@gmcrocetti gmcrocetti Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @WillAyd ,

I don't think we have a lifecycle management problem but nonetheless the implementation has changed as complicated is not the goal here. Thanks for raising it o/
The implementation is back to its original version with a small change in the name of things. cur (instead of conn) is used to represent a cursor since it is what run_transaction returns for SQLiteDatabase's implementation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @WillAyd , it's been a while...
So just passing by to know if there's something else to do here or if it is ok to open for reviews
thanks in advance


def insert_statement(self, *, num_rows: int) -> str:
names = list(map(str, self.frame.columns))
Expand All @@ -2520,8 +2534,13 @@ def insert_statement(self, *, num_rows: int) -> str:
return insert_statement

def _execute_insert(self, conn, keys, data_iter) -> int:
from sqlite3 import DatabaseError as SQLiteDatabaseError

data_list = list(data_iter)
conn.executemany(self.insert_statement(num_rows=1), data_list)
try:
conn.executemany(self.insert_statement(num_rows=1), data_list)
except SQLiteDatabaseError as exc:
raise DatabaseError("Execution failed") from exc
return conn.rowcount

def _execute_insert_multi(self, conn, keys, data_iter) -> int:
Expand Down Expand Up @@ -2643,17 +2662,19 @@ def run_transaction(self):
cur.close()

def execute(self, sql: str | Select | TextClause, params=None):
from sqlite3 import DatabaseError as SQLiteDatabaseError

if not isinstance(sql, str):
raise TypeError("Query must be a string unless using sqlalchemy.")
args = [] if params is None else [params]
cur = self.con.cursor()
try:
cur.execute(sql, *args)
return cur
except Exception as exc:
except SQLiteDatabaseError as exc:
try:
self.con.rollback()
except Exception as inner_exc: # pragma: no cover
except SQLiteDatabaseError as inner_exc: # pragma: no cover
ex = DatabaseError(
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
)
Expand Down
Loading