Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement option 'delete_rows' of argument 'if_exists' in 'DataFrame.to_sql' API. #60376

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Other enhancements
- :meth:`Series.map` can now accept kwargs to pass on to func (:issue:`59814`)
- :meth:`pandas.concat` will raise a ``ValueError`` when ``ignore_index=True`` and ``keys`` is not ``None`` (:issue:`59274`)
- :meth:`str.get_dummies` now accepts a ``dtype`` parameter to specify the dtype of the resulting DataFrame (:issue:`47872`)
- Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`).
- Implemented :meth:`Series.str.isascii` and :meth:`Series.str.isascii` (:issue:`59091`)
- Multiplying two :class:`DateOffset` objects will now raise a ``TypeError`` instead of a ``RecursionError`` (:issue:`59442`)
- Restore support for reading Stata 104-format and enable reading 103-format dta files (:issue:`58554`)
Expand Down
6 changes: 6 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2795,6 +2795,12 @@ def to_sql(
Databases supported by SQLAlchemy [1]_ are supported. Tables can be
newly created, appended to, or overwritten.

.. warning::
The pandas library does not attempt to sanitize inputs provided via a to_sql call.
Please refer to the documentation for the underlying database driver to see if it
will properly prevent injection, or alternatively be advised of a security risk when
executing arbitrary commands in a to_sql call.

Parameters
----------
name : str
Expand Down
90 changes: 70 additions & 20 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

from sqlalchemy import Table
from sqlalchemy.sql.expression import (
Delete,
Select,
TextClause,
)
Expand Down Expand Up @@ -738,7 +739,7 @@ def to_sql(
name: str,
con,
schema: str | None = None,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label: IndexLabel | None = None,
chunksize: int | None = None,
Expand All @@ -750,6 +751,12 @@ def to_sql(
"""
Write records stored in a DataFrame to a SQL database.

.. warning::
The pandas library does not attempt to sanitize inputs provided via a to_sql call.
Please refer to the documentation for the underlying database driver to see if it
will properly prevent injection, or alternatively be advised of a security risk when
executing arbitrary commands in a to_sql call.

Parameters
----------
frame : DataFrame, Series
Expand All @@ -764,10 +771,11 @@ def to_sql(
schema : str, optional
Name of SQL schema in database to write to (if database flavor
supports this). If None, use default schema (default).
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : bool, default True
Write DataFrame index as a column.
index_label : str or sequence, optional
Expand Down Expand Up @@ -818,7 +826,7 @@ def to_sql(
`sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or
`SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__
""" # noqa: E501
if if_exists not in ("fail", "replace", "append"):
if if_exists not in ("fail", "replace", "append", "delete_rows"):
raise ValueError(f"'{if_exists}' is not valid for if_exists")

if isinstance(frame, Series):
Expand Down Expand Up @@ -926,7 +934,7 @@ def __init__(
pandas_sql_engine,
frame=None,
index: bool | str | list[str] | None = True,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
prefix: str = "pandas",
index_label=None,
schema=None,
Expand Down Expand Up @@ -974,11 +982,13 @@ def create(self) -> None:
if self.exists():
if self.if_exists == "fail":
raise ValueError(f"Table '{self.name}' already exists.")
if self.if_exists == "replace":
elif self.if_exists == "replace":
gmcrocetti marked this conversation as resolved.
Show resolved Hide resolved
self.pd_sql.drop_table(self.name, self.schema)
self._execute_create()
elif self.if_exists == "append":
pass
elif self.if_exists == "delete_rows":
self.pd_sql.delete_rows(self.name, self.schema)
else:
raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
else:
Expand All @@ -997,7 +1007,7 @@ def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
Each item contains a list of values to be inserted
"""
data = [dict(zip(keys, row)) for row in data_iter]
result = conn.execute(self.table.insert(), data)
result = self.pd_sql.execute(self.table.insert(), data)
return result.rowcount

def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
Expand All @@ -1014,7 +1024,7 @@ def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:

data = [dict(zip(keys, row)) for row in data_iter]
stmt = insert(self.table).values(data)
result = conn.execute(stmt)
result = self.pd_sql.execute(stmt)
return result.rowcount

def insert_data(self) -> tuple[list[str], list[np.ndarray]]:
Expand Down Expand Up @@ -1480,7 +1490,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1649,12 +1659,18 @@ def run_transaction(self):
else:
yield self.con

def execute(self, sql: str | Select | TextClause, params=None):
def execute(self, sql: str | Select | TextClause | Delete, params=None):
"""Simple passthrough to SQLAlchemy connectable"""
args = [] if params is None else [params]
if isinstance(sql, str):
return self.con.exec_driver_sql(sql, *args)
return self.con.execute(sql, *args)
execute_function = self.con.exec_driver_sql
else:
execute_function = self.con.execute

try:
return execute_function(sql, *args)
except Exception as exc:
raise DatabaseError(f"Execution failed on sql '{sql}': {exc}") from exc

def read_table(
self,
Expand Down Expand Up @@ -1866,7 +1882,7 @@ def prep_table(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool | str | list[str] | None = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1943,7 +1959,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -1961,10 +1977,11 @@ def to_sql(
frame : DataFrame
name : string
Name of SQL table.
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand Down Expand Up @@ -2061,6 +2078,18 @@ def drop_table(self, table_name: str, schema: str | None = None) -> None:
self.get_table(table_name, schema).drop(bind=self.con)
self.meta.clear()

def delete_rows(self, table_name: str, schema: str | None = None) -> None:
schema = schema or self.meta.schema
if self.has_table(table_name, schema):
self.meta.reflect(
bind=self.con, only=[table_name], schema=schema, views=True
)
with self.run_transaction():
table = self.get_table(table_name, schema)
self.execute(table.delete())

self.meta.clear()

def _create_sql_schema(
self,
frame: DataFrame,
Expand Down Expand Up @@ -2296,7 +2325,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -2318,6 +2347,7 @@ def to_sql(
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand Down Expand Up @@ -2368,6 +2398,9 @@ def to_sql(
cur.execute(f"DROP TABLE {table_name}")
elif if_exists == "append":
mode = "append"
elif if_exists == "delete_rows":
mode = "append"
self.delete_rows(name, schema)

import pyarrow as pa

Expand All @@ -2377,9 +2410,12 @@ def to_sql(
raise ValueError("datatypes not supported") from exc

with self.con.cursor() as cur:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
try:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
except Exception as exc:
raise DatabaseError("Execution failed") from exc

self.con.commit()
return total_inserted
Expand All @@ -2402,6 +2438,11 @@ def has_table(self, name: str, schema: str | None = None) -> bool:

return False

def delete_rows(self, name: str, schema: str | None = None) -> None:
table_name = f"{schema}.{name}" if schema else name
if self.has_table(name, schema):
self.execute(f"DELETE FROM {table_name}").close()

def _create_sql_schema(
self,
frame: DataFrame,
Expand Down Expand Up @@ -2521,7 +2562,10 @@ def insert_statement(self, *, num_rows: int) -> str:

def _execute_insert(self, conn, keys, data_iter) -> int:
data_list = list(data_iter)
conn.executemany(self.insert_statement(num_rows=1), data_list)
try:
conn.executemany(self.insert_statement(num_rows=1), data_list)
except Exception as exc:
raise DatabaseError("Execution failed") from exc
return conn.rowcount

def _execute_insert_multi(self, conn, keys, data_iter) -> int:
Expand Down Expand Up @@ -2769,10 +2813,11 @@ def to_sql(
frame: DataFrame
name: string
Name of SQL table.
if_exists: {'fail', 'replace', 'append'}, default 'fail'
if_exists: {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
fail: If table exists, do nothing.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if it does not exist.
delete_rows: If a table exists, delete all records and insert data.
gmcrocetti marked this conversation as resolved.
Show resolved Hide resolved
index : bool, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Expand Down Expand Up @@ -2848,6 +2893,11 @@ def drop_table(self, name: str, schema: str | None = None) -> None:
drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}"
self.execute(drop_sql)

def delete_rows(self, name: str, schema: str | None = None) -> None:
delete_sql = f"DELETE FROM {_get_valid_sqlite_name(name)}"
if self.has_table(name, schema):
self.execute(delete_sql)

def _create_sql_schema(
self,
frame,
Expand Down
60 changes: 57 additions & 3 deletions pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,9 @@ def test_to_sql(conn, method, test_frame1, request):


@pytest.mark.parametrize("conn", all_connectable)
@pytest.mark.parametrize("mode, num_row_coef", [("replace", 1), ("append", 2)])
@pytest.mark.parametrize(
"mode, num_row_coef", [("replace", 1), ("append", 2), ("delete_rows", 1)]
)
def test_to_sql_exist(conn, mode, num_row_coef, test_frame1, request):
conn = request.getfixturevalue(conn)
with pandasSQL_builder(conn, need_transaction=True) as pandasSQL:
Expand Down Expand Up @@ -2698,6 +2700,58 @@ def test_drop_table(conn, request):
assert not insp.has_table("temp_frame")


@pytest.mark.parametrize("conn_name", all_connectable)
def test_delete_rows_success(conn_name, test_frame1, request):
table_name = "temp_frame"
conn = request.getfixturevalue(conn_name)

with pandasSQL_builder(conn) as pandasSQL:
with pandasSQL.run_transaction():
assert pandasSQL.to_sql(test_frame1, table_name) == test_frame1.shape[0]

with pandasSQL.run_transaction():
assert pandasSQL.delete_rows(table_name) is None

assert count_rows(conn, table_name) == 0
assert pandasSQL.has_table("temp_frame")


@pytest.mark.parametrize("conn_name", all_connectable)
def test_delete_rows_is_atomic(conn_name, request):
sqlalchemy = pytest.importorskip("sqlalchemy")

table_name = "temp_frame"
table_stmt = f"CREATE TABLE {table_name} (a INTEGER, b INTEGER UNIQUE NOT NULL)"

if conn_name != "sqlite_buildin" and "adbc" not in conn_name:
table_stmt = sqlalchemy.text(table_stmt)

# setting dtype is mandatory for adbc related tests
original_df = DataFrame({"a": [1, 2], "b": [3, 4]}, dtype="int32")
replacing_df = DataFrame({"a": [5, 6, 7], "b": [8, 8, 8]}, dtype="int32")

conn = request.getfixturevalue(conn_name)
pandasSQL = pandasSQL_builder(conn)

with pandasSQL.run_transaction() as cur:
cur.execute(table_stmt)

with pandasSQL.run_transaction():
pandasSQL.to_sql(original_df, table_name, if_exists="append", index=False)

# inserting duplicated values in a UNIQUE constraint column
with pytest.raises(pd.errors.DatabaseError):
with pandasSQL.run_transaction():
pandasSQL.to_sql(
replacing_df, table_name, if_exists="delete_rows", index=False
)

# failed "delete_rows" is rolled back preserving original data
with pandasSQL.run_transaction():
result_df = pandasSQL.read_query(f"SELECT * FROM {table_name}", dtype="int32")
tm.assert_frame_equal(result_df, original_df)


@pytest.mark.parametrize("conn", all_connectable)
gmcrocetti marked this conversation as resolved.
Show resolved Hide resolved
def test_roundtrip(conn, request, test_frame1):
if conn == "sqlite_str":
Expand Down Expand Up @@ -3409,8 +3463,8 @@ def test_to_sql_with_negative_npinf(conn, request, input):
mark = pytest.mark.xfail(reason="GH 36465")
request.applymarker(mark)

msg = "inf cannot be used with MySQL"
with pytest.raises(ValueError, match=msg):
msg = "Execution failed on sql"
with pytest.raises(pd.errors.DatabaseError, match=msg):
df.to_sql(name="foobar", con=conn, index=False)
else:
assert df.to_sql(name="foobar", con=conn, index=False) == 1
Expand Down
Loading