Skip to content

Commit

Permalink
SQLCheckOperator fails if returns dict with any False values (#36273)
Browse files Browse the repository at this point in the history
Co-authored-by: stollefson <[email protected]>
  • Loading branch information
spencertollefson and stollefson authored Dec 19, 2023
1 parent e920344 commit 5c1d8f4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
5 changes: 5 additions & 0 deletions airflow/providers/common/sql/operators/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,8 @@ class SQLCheckOperator(BaseSQLOperator):
The ``SQLCheckOperator`` expects a sql query that will return a single row.
Each value on that first row is evaluated using python ``bool`` casting.
If any of the values return ``False`` the check is failed and errors out.
If a Python dict is returned, and any values in the Python dict are ``False``,
the check is failed and errors out.
Note that Python bool casting evals the following as ``False``:
Expand All @@ -737,6 +739,7 @@ class SQLCheckOperator(BaseSQLOperator):
* Empty string (``""``)
* Empty list (``[]``)
* Empty dictionary or set (``{}``)
* Dictionary with value = ``False`` (``{'DUPLICATE_ID_CHECK': False}``)
Given a query like ``SELECT COUNT(*) FROM foo``, it will fail only if
the count ``== 0``. You can craft much more complex query that could,
Expand Down Expand Up @@ -785,6 +788,8 @@ def execute(self, context: Context):
self.log.info("Record: %s", records)
if not records:
self._raise_exception(f"The following query returned zero rows: {self.sql}")
elif isinstance(records, dict) and not all(records.values()):
self._raise_exception(f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}")
elif not all(records):
self._raise_exception(f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}")

Expand Down
10 changes: 10 additions & 0 deletions tests/providers/common/sql/operators/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,16 @@ def test_execute_not_all_records_are_true(self, mock_get_db_hook):
with pytest.raises(AirflowException, match=r"Test failed."):
self._operator.execute({})

@mock.patch.object(SQLCheckOperator, "get_db_hook")
def test_execute_records_dict_not_all_values_are_true(self, mock_get_db_hook):
mock_get_db_hook.return_value.get_first.return_value = {
"DUPLICATE_ID_CHECK": False,
"NULL_VALUES_CHECK": True,
}

with pytest.raises(AirflowException, match=r"Test failed."):
self._operator.execute({})

@mock.patch.object(SQLCheckOperator, "get_db_hook")
def test_sqlcheckoperator_parameters(self, mock_get_db_hook):
self._operator.execute({})
Expand Down

0 comments on commit 5c1d8f4

Please sign in to comment.