diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md
index 2f9577a..bebac14 100644
--- a/CONTRIBUTORS.md
+++ b/CONTRIBUTORS.md
@@ -14,6 +14,7 @@ Thanks to the contributors who helped on this project apart from the authors
* [Vigneshwarr Venkatesan](https://www.linkedin.com/in/vignesh15)
* [Nishant Singh](https://www.linkedin.com/in/singh-nishant/)
* [Amaldev Kunnel](https://www.linkedin.com/in/amaldev-k-40222680)
+* [Raghavendra H S](https://www.linkedin.com/in/raghavendra-h-s-01786332/)
* [Sudeepta pal](https://www.linkedin.com/in/sudeepta-pal-98b393217/)
* [Mallikarjunudu Tirumalasetti](https://www.linkedin.com/in/mtirumal/)
* [Tadakala sai vamsi goud](https://www.linkedin.com/in/sai-vamsi-goud-455737169/)
diff --git a/docs/delta.md b/docs/delta.md
index 2db8d45..d68abaa 100644
--- a/docs/delta.md
+++ b/docs/delta.md
@@ -19,6 +19,7 @@ builder = (
.config("spark.sql.warehouse.dir", "/tmp/hive/warehouse")
.config("spark.driver.extraJavaOptions", "-Dderby.system.home=/tmp/derby")
.config("spark.jars.ivy", "/tmp/ivy2")
+ .config("spark.databricks.delta.schema.autoMerge.enabled", "true")
)
spark = builder.getOrCreate()
```
diff --git a/docs/getting-started/setup.md b/docs/getting-started/setup.md
index e77435c..df4827f 100644
--- a/docs/getting-started/setup.md
+++ b/docs/getting-started/setup.md
@@ -156,29 +156,30 @@ product_id string, -- (2)!
table_name string, -- (3)!
rule_type string, -- (4)!
rule string, -- (5)!
-source_expectations string, -- (6)!
-tag string, -- (7)!
-description string, -- (8)!
-source_dq_status string, -- (9)!
-source_dq_actual_outcome string, -- (10)!
-source_dq_expected_outcome string, -- (11)!
-source_dq_actual_row_count string, -- (12)!
-source_dq_error_row_count string, -- (13)!
-source_dq_row_count string, -- (14)!
-source_dq_start_time string, -- (15)!
-source_dq_end_time string, -- (16)!
-target_expectations string, -- (17)!
-target_dq_status string, -- (18)!
-target_dq_actual_outcome string, -- (19)!
-target_dq_expected_outcome string, -- (20)!
-target_dq_actual_row_count string, -- (21)!
-target_dq_error_row_count string, -- (22)!
-target_dq_row_count string, -- (23)!
-target_dq_start_time string, -- (24)!
-target_dq_end_time string, -- (25)!
-dq_date date, -- (26)!
-dq_time string, -- (27)!
-dq_job_metadata_info string, -- (28)!
+column_name, --(6)!
+source_expectations string, -- (7)!
+tag string, -- (8)!
+description string, -- (9)!
+source_dq_status string, -- (10)!
+source_dq_actual_outcome string, -- (11)!
+source_dq_expected_outcome string, -- (12)!
+source_dq_actual_row_count string, -- (13)!
+source_dq_error_row_count string, -- (14)!
+source_dq_row_count string, -- (15)!
+source_dq_start_time string, -- (16)!
+source_dq_end_time string, -- (17)!
+target_expectations string, -- (18)!
+target_dq_status string, -- (19)!
+target_dq_actual_outcome string, -- (20)!
+target_dq_expected_outcome string, -- (21)!
+target_dq_actual_row_count string, -- (22)!
+target_dq_error_row_count string, -- (23)!
+target_dq_row_count string, -- (24)!
+target_dq_start_time string, -- (25)!
+target_dq_end_time string, -- (26)!
+dq_date date, -- (27)!
+dq_time string, -- (28)!
+dq_job_metadata_info string, -- (29)!
);
```
@@ -187,26 +188,27 @@ dq_job_metadata_info string, -- (28)!
3. `table_name` The target table where the final data gets inserted
4. `rule_type` Either row/query/agg dq
5. `rule` Rule name
-6. `source_expectations` Actual Rule to be executed on the source dq
-7. `tag` completeness,uniqueness,validity,accuracy,consistency,
-8. `description` Description of the Rule
-9. `source_dq_status` Status of the rule execution in the Source dq
-10. `source_dq_actual_outcome` Actual outcome of the Source dq check
-11. `source_dq_expected_outcome` Expected outcome of the Source dq check
-12. `source_dq_actual_row_count` Number of rows of the source dq
-13. `source_dq_error_row_count` Number of rows failed in the source dq
-14. `source_dq_row_count` Number of rows of the source dq
-15. `source_dq_start_time` source dq start timestamp
-16. `source_dq_end_time` source dq end timestamp
-17. `target_expectations` Actual Rule to be executed on the target dq
-18. `target_dq_status` Status of the rule execution in the Target dq
-19. `target_dq_actual_outcome` Actual outcome of the Target dq check
-20. `target_dq_expected_outcome` Expected outcome of the Target dq check
-21. `target_dq_actual_row_count` Number of rows of the target dq
-22. `target_dq_error_row_count` Number of rows failed in the target dq
-23. `target_dq_row_count` Number of rows of the target dq
-24. `target_dq_start_time` target dq start timestamp
-25. `target_dq_end_time` target dq end timestamp
-26. `dq_date` Dq executed date
-27. `dq_time` Dq executed timestamp
-28. `dq_job_metadata_info` dq job metadata
+6. `column_name` column name where the rule got executed
+7. `source_expectations` Actual Rule to be executed on the source dq
+8. `tag` completeness,uniqueness,validity,accuracy,consistency,
+9. `description` Description of the Rule
+10. `source_dq_status` Status of the rule execution in the Source dq
+11. `source_dq_actual_outcome` Actual outcome of the Source dq check
+12. `source_dq_expected_outcome` Expected outcome of the Source dq check
+13. `source_dq_actual_row_count` Number of rows of the source dq
+14. `source_dq_error_row_count` Number of rows failed in the source dq
+15. `source_dq_row_count` Number of rows of the source dq
+16. `source_dq_start_time` source dq start timestamp
+17. `source_dq_end_time` source dq end timestamp
+18. `target_expectations` Actual Rule to be executed on the target dq
+19. `target_dq_status` Status of the rule execution in the Target dq
+20. `target_dq_actual_outcome` Actual outcome of the Target dq check
+21. `target_dq_expected_outcome` Expected outcome of the Target dq check
+22. `target_dq_actual_row_count` Number of rows of the target dq
+23. `target_dq_error_row_count` Number of rows failed in the target dq
+24. `target_dq_row_count` Number of rows of the target dq
+25. `target_dq_start_time` target dq start timestamp
+26. `target_dq_end_time` target dq end timestamp
+27. `dq_date` Dq executed date
+28. `dq_time` Dq executed timestamp
+29. `dq_job_metadata_info` dq job metadata
diff --git a/spark_expectations/config/user_config.py b/spark_expectations/config/user_config.py
index 803968f..f7dd211 100644
--- a/spark_expectations/config/user_config.py
+++ b/spark_expectations/config/user_config.py
@@ -5,6 +5,7 @@
class Constants:
# declare const user config variables for email notification
se_notifications_enable_email = "spark.expectations.notifications.email.enabled"
+ se_enable_observability = "spark.expectations.observability.enabled"
se_notifications_enable_custom_email_body = (
"spark.expectations.notifications.enable.custom.email.body"
)
@@ -16,7 +17,7 @@ class Constants:
)
se_notifications_email_from = "spark.expectations.notifications.email.from"
se_notifications_email_to_other_mail_id = (
- "spark.expectations.notifications.email.to.other.mail.com"
+ "sudeepta.pal@nike.com"
)
se_notifications_email_subject = "spark.expectations.notifications.email.subject"
se_notifications_email_custom_body = (
diff --git a/spark_expectations/core/context.py b/spark_expectations/core/context.py
index 6c35155..e0ae1af 100644
--- a/spark_expectations/core/context.py
+++ b/spark_expectations/core/context.py
@@ -25,6 +25,7 @@ def __post_init__(self) -> None:
self._run_id: str = f"{self.product_id}_{uuid1()}"
self._run_date: str = self.set_run_date()
self._dq_stats_table_name: Optional[str] = None
+ self._dq_stats_report_table_name: Optional[str] = None
self._dq_detailed_stats_table_name: Optional[str] = None
self._final_table_name: Optional[str] = None
self._error_table_name: Optional[str] = None
@@ -142,6 +143,7 @@ def __post_init__(self) -> None:
self._target_and_error_table_writer_config: dict = {}
self._stats_table_writer_config: dict = {}
+ self._report_table_config: dict = {}
# The below config is user config and will be enabled if detailed result is required for agg and query dq
self._enable_agg_dq_detailed_result: bool = False
@@ -196,6 +198,25 @@ def get_dq_stats_table_name(self) -> str:
"""The spark expectations context is not set completely, please assign '_dq_stats_table_name' before
accessing it"""
)
+ def set_dq_stats_report_table_name(self,dq_stats_report_table_name: str) -> None:
+ self._dq_stats_report_table_name = dq_stats_report_table_name
+
+
+
+ @property
+ def get_dq_stats_report_table_name(self) -> str:
+ """
+ Get dq_stats_table_name to which the final stats of the dq job will be written into
+
+ Returns:
+ str: returns the dq_stats_table_name
+ """
+ if self._dq_stats_report_table_name:
+ return self._dq_stats_report_table_name
+ raise SparkExpectationsMiscException(
+ """The spark expectations context is not set completely, please assign '_dq_stats_report_table_name' before
+ accessing it"""
+ )
def set_dq_expectations(self, dq_expectations: dict) -> None:
self._dq_expectations = dq_expectations
@@ -1844,7 +1865,7 @@ def set_dq_detailed_stats_table_name(
self._dq_detailed_stats_table_name = dq_detailed_stats_table_name
@property
- def get_dq_detailed_stats_table_name(self) -> str:
+ def get_dq_detailed_stats_table_name(self) -> str:
"""
Get dq_stats_table_name to which the final stats of the dq job will be written into
@@ -1885,6 +1906,23 @@ def get_query_dq_output_custom_table_name(self) -> str:
'_dq_detailed_stats_table_name,query_dq_detailed_stats_status' before
accessing it"""
)
+ def set_report_table_config(self, config: dict) -> None:
+ """
+ This function sets report table config
+ Args:
+ config: dict
+ Returns: None
+ """
+ self._report_table_config = config
+
+ @property
+ def get_report_table_config(self) -> dict:
+ """
+ This function returns report table config
+ Returns:
+ dict: Returns report_table_config which in dict
+ """
+ return self._report_table_config
def set_detailed_stats_table_writer_config(self, config: dict) -> None:
"""
@@ -2062,3 +2100,7 @@ def get_stats_dict(self) -> Optional[List[Dict[str, Any]]]:
Optional[List[Dict[str, Any]]]: Returns the stats_dict if it exists, otherwise None
"""
return self._stats_dict if hasattr(self, "_stats_dict") else None
+
+ @property
+ def report_table_config(self):
+ return self._report_table_config
diff --git a/spark_expectations/core/expectations.py b/spark_expectations/core/expectations.py
index cb2fcca..6ab6bae 100644
--- a/spark_expectations/core/expectations.py
+++ b/spark_expectations/core/expectations.py
@@ -358,7 +358,12 @@ def _except(func: Any) -> Any:
else False
)
- _job_metadata: str = user_config.se_job_metadata
+ # _job_metadata: str = user_config.se_job_metadata
+ _job_metadata: str = (
+ str(_notification_dict[user_config.se_job_metadata])
+ if isinstance(_notification_dict[user_config.se_job_metadata], str)
+ else None
+ )
notifications_on_error_drop_threshold = _notification_dict.get(
user_config.se_notifications_on_error_drop_threshold, 100
diff --git a/spark_expectations/examples/base_setup.py b/spark_expectations/examples/base_setup.py
index 3527b0d..33f82e7 100644
--- a/spark_expectations/examples/base_setup.py
+++ b/spark_expectations/examples/base_setup.py
@@ -134,6 +134,7 @@ def set_up_delta() -> SparkSession:
.config("spark.sql.warehouse.dir", "/tmp/hive/warehouse")
.config("spark.driver.extraJavaOptions", "-Dderby.system.home=/tmp/derby")
.config("spark.jars.ivy", "/tmp/ivy2")
+ .config("spark.databricks.delta.schema.autoMerge.enabled", "true")
)
spark = builder.getOrCreate()
diff --git a/spark_expectations/examples/sample_dq_delta.py b/spark_expectations/examples/sample_dq_delta.py
index 1bad471..78d627c 100644
--- a/spark_expectations/examples/sample_dq_delta.py
+++ b/spark_expectations/examples/sample_dq_delta.py
@@ -15,8 +15,9 @@
spark = set_up_delta()
dic_job_info = {
- "job": "job_name",
+ "job": "na_CORL_DIGITAL_source_to_o9",
"Region": "NA",
+ "env": "dev",
"Snapshot": "2024-04-15",
}
job_info = str(dic_job_info)
@@ -32,12 +33,13 @@
)
user_conf = {
- user_config.se_notifications_enable_email: True,
- user_config.se_notifications_enable_custom_email_body: True,
- user_config.se_notifications_email_smtp_host: "mailhost.com",
- user_config.se_notifications_email_smtp_port: 25,
- user_config.se_notifications_email_from: "",
- user_config.se_notifications_email_to_other_mail_id: "",
+ user_config.se_enable_observability:"Enable",
+ user_config.se_notifications_enable_email: False,
+ user_config.se_notifications_enable_custom_email_body: False,
+ user_config.se_notifications_email_smtp_host: "smtp.office365.com",
+ user_config.se_notifications_email_smtp_port: 587,
+ user_config.se_notifications_email_from: "sudeepta.pal@nike.com",
+ user_config.se_notifications_email_to_other_mail_id: "sudeepta.pal@nike.com",
user_config.se_notifications_email_subject: "spark expectations - data quality - notifications",
user_config.se_notifications_email_custom_body: """Spark Expectations Statistics for this dq run:
'product_id': {},
@@ -116,12 +118,16 @@ def build_new() -> DataFrame:
spark.sql("select * from dq_spark_dev.dq_stats").show(truncate=False)
spark.sql("select * from dq_spark_dev.dq_stats_detailed").show(truncate=False)
spark.sql("select * from dq_spark_dev.dq_stats_querydq_output").show(truncate=False)
- spark.sql("select * from dq_spark_dev.dq_stats").printSchema()
- spark.sql("select * from dq_spark_dev.dq_stats_detailed").printSchema()
- spark.sql("select * from dq_spark_dev.customer_order").show(truncate=False)
+ _log.info("BELOW IS THE REPORT TABLE")
+
+ spark.sql("select * from dq_spark_dev.dq_obs_report_data").show(truncate=False)
+
# spark.sql("select count(*) from dq_spark_local.customer_order_error ").show(
# truncate=False
# )
+ if user_config.se_enable_observability == "spark.expectations.observability.enabled":
+ _log.info("alert_send_successfully")
+
_log.info("stats data in the kafka topic")
# display posted statistics from the kafka topic
diff --git a/spark_expectations/sinks/utils/writer.py b/spark_expectations/sinks/utils/writer.py
index d31fbf5..9203966 100644
--- a/spark_expectations/sinks/utils/writer.py
+++ b/spark_expectations/sinks/utils/writer.py
@@ -23,6 +23,7 @@
SparkExpectationsMiscException,
)
from spark_expectations.secrets import SparkExpectationsSecretsBackend
+from spark_expectations.utils.alert import AlertTrial
from spark_expectations.utils.udf import remove_empty_maps
from spark_expectations.core.context import SparkExpectationsContext
from spark_expectations.sinks import _sink_hook
@@ -135,6 +136,7 @@ def get_row_dq_detailed_stats(
str,
str,
str,
+ str,
None,
None,
int,
@@ -176,6 +178,7 @@ def get_row_dq_detailed_stats(
for _rowdq_rule in _row_dq_expectations:
# if _rowdq_rule["rule"] in _dq_res:
+
failed_row_count = _dq_res[_rowdq_rule["rule"]]
_row_dq_result.append(
(
@@ -184,6 +187,7 @@ def get_row_dq_detailed_stats(
_table_name,
_rowdq_rule["rule_type"],
_rowdq_rule["rule"],
+ _rowdq_rule["column_name"],
_rowdq_rule["expectation"],
_rowdq_rule["tag"],
_rowdq_rule["description"],
@@ -322,6 +326,7 @@ def _prep_secondary_query_output(self) -> DataFrame:
"product_id",
"table_name",
"rule",
+ "column_name",
"alias",
"dq_type",
"source_dq",
@@ -354,7 +359,8 @@ def _prep_secondary_query_output(self) -> DataFrame:
+ "target.source_dq as target_output from _df_custom_detailed_stats_source as source "
+ "left outer join _df_custom_detailed_stats_source as target "
+ "on source.run_id=target.run_id and source.product_id=target.product_id and "
- + "source.table_name=target.table_name and source.rule=target.rule "
+ + "source.table_name=target.table_name and source.rule=target.rule and "
+ + "source.column_name = target.column_name and source.dq_type = target.dq_type "
+ "and source.alias_comp=target.alias_comp "
+ "and source.compare = 'source' and target.compare = 'target' "
)
@@ -395,6 +401,7 @@ def _prep_detailed_stats(
"table_name",
"rule_type",
"rule",
+ "column_name",
"source_expectations",
"tag",
"description",
@@ -415,6 +422,7 @@ def _prep_detailed_stats(
"table_name",
"rule_type",
"rule",
+ "column_name",
"target_expectations",
"tag",
"description",
@@ -483,7 +491,7 @@ def _prep_detailed_stats(
_df_detailed_stats = _df_source_aggquery_detailed_stats.join(
_df_target_aggquery_detailed_stats,
- ["run_id", "product_id", "table_name", "rule_type", "rule"],
+ ["run_id", "product_id", "table_name", "rule_type", "rule", "column_name"],
"full_outer",
)
@@ -585,6 +593,21 @@ def write_detailed_stats(self) -> None:
raise SparkExpectationsMiscException(
f"error occurred while saving the data into the stats table {e}"
)
+ print("------------------------------------------########################################################################spark expectation ending here.#####################------------------------------------------")
+ print(user_config.se_enable_observability)
+ # Call the dq_obs_report_data_insert method from report.py
+ if user_config.se_enable_observability=="spark.expectations.observability.enabled":
+ from spark_expectations.utils.report import SparkExpectationsReport
+ context = self._context
+ # report = SparkExpectationsReport(_context=context)
+ # report.dq_obs_report_data_insert(_df_detailed_stats,_df_custom_detailed_stats_source)
+ from spark_expectations.utils.alert import AlertTrial
+ alert = AlertTrial(self._context)
+ alert.get_report_data(_df_detailed_stats, _df_custom_detailed_stats_source)
+
+
+
+
def write_error_stats(self) -> None:
"""
@@ -605,6 +628,7 @@ def write_error_stats(self) -> None:
input_count: int = self._context.get_input_count
error_count: int = self._context.get_error_count
output_count: int = self._context.get_output_count
+
source_agg_dq_result: Optional[
List[Dict[str, str]]
] = self._context.get_source_agg_dq_result
@@ -760,8 +784,6 @@ def write_error_stats(self) -> None:
.withColumn("success_percentage", sql_round(df.success_percentage, 2))
.withColumn("error_percentage", sql_round(df.error_percentage, 2))
)
-
- self._context.set_stats_dict(df)
_log.info(
"Writing metrics to the stats table: %s, started",
self._context.get_dq_stats_table_name,
diff --git a/spark_expectations/utils/actions.py b/spark_expectations/utils/actions.py
index 0509aa5..9a45327 100644
--- a/spark_expectations/utils/actions.py
+++ b/spark_expectations/utils/actions.py
@@ -129,6 +129,7 @@ def agg_query_dq_detailed_result(
str,
Any,
str,
+ str,
dict,
str,
]
@@ -328,20 +329,24 @@ def agg_query_dq_detailed_result(
)
):
for _key, _querydq_query in sub_key_value.items():
- _querydq_df = _context.spark.sql(_dq_rule["expectation" + "_" + _key])
querydq_output.append(
(
_context.get_run_id,
_dq_rule["product_id"],
_dq_rule["table_name"],
_dq_rule["rule"],
+ _dq_rule["column_name"],
_key,
_query_prefix,
dict(
[
(
_key,
- [row.asDict() for row in _querydq_df.collect()],
+ _context.spark.sql(
+ _dq_rule["expectation" + "_" + _key]
+ )
+ .toJSON()
+ .collect(),
)
]
),
@@ -421,6 +426,7 @@ def execute_sql_and_get_result(
_dq_rule["table_name"],
_dq_rule["rule_type"],
_dq_rule["rule"],
+ _dq_rule["column_name"],
_dq_rule["expectation"],
_dq_rule["tag"],
_dq_rule["description"],
diff --git a/spark_expectations/utils/alert.py b/spark_expectations/utils/alert.py
new file mode 100644
index 0000000..230fad8
--- /dev/null
+++ b/spark_expectations/utils/alert.py
@@ -0,0 +1,117 @@
+import os
+from dataclasses import dataclass
+from typing import Dict, Tuple
+from pyspark.sql import SparkSession, DataFrame
+import smtplib
+import traceback
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from os import getenv
+from jinja2 import Environment, FileSystemLoader
+import re
+
+from pyspark.sql.types import StructType, StructField, StringType
+
+from spark_expectations.core.context import SparkExpectationsContext
+
+@dataclass
+class AlertTrial:
+ """
+ This class implements the alert trial functionality.
+ """
+ _context: SparkExpectationsContext
+
+ def __post_init__(self) -> None:
+ self.spark = self._context.spark
+
+ def send_mail(self, body: str, subject: str, receivers_list: str) -> None:
+ """
+ This function is to send the DQ report to the users.
+
+ Args:
+ body: Email body.
+ subject: Email subject.
+ receivers_list: List of email receivers.
+ """
+ try:
+ cerberus_url = 'https://prod.cerberus.nikecloud.com/'
+ cerberus_sdb_path = "app/if-common/smtp"
+ smtp_details = {'a.dsm.pss.obs': 'wp=Wq$37#UI?Ijy7_HNU', 'a.e2e.map.smtp': 'wp=Wq$37#UI?Ijy7_HNU'}
+
+ SMTP_USER_NAME = list(smtp_details.keys())[0]
+ service_account_email = f"{SMTP_USER_NAME}@nike.com"
+ service_account_password = smtp_details.get(SMTP_USER_NAME)
+ body = MIMEText(body, 'html')
+ msg = MIMEMultipart()
+ msg.attach(body)
+ msg['Subject'] = subject
+ msg['From'] = service_account_email
+ msg['To'] = receivers_list
+
+ smtp_host = getenv('SMTP_HOST') or "smtp.office365.com"
+ smtp_port = getenv('SMTP_PORT') or 587
+
+ with smtplib.SMTP(smtp_host, port=smtp_port) as smtp_server:
+ smtp_server.ehlo()
+ smtp_server.starttls()
+ smtp_server.login(service_account_email, service_account_password)
+ smtp_server.sendmail(msg['From'], receivers_list.split(','), msg.as_string())
+ print("Report sent successfully!")
+ except Exception as e:
+ print(f"Error in send_mail: {e}")
+ traceback.print_exc()
+
+ def get_report_data(self, df_detailed: DataFrame, df_query_output: DataFrame) -> None:
+ """
+ This function calls the dq_obs_report_data_insert method from SparkExpectationsReport.
+
+ Args:
+ df_detailed: Detailed DataFrame.
+ df_query_output: Query output DataFrame.
+ """
+ try:
+ from spark_expectations.utils.report import SparkExpectationsReport
+
+ report = SparkExpectationsReport(self._context)
+ df = report.dq_obs_report_data_insert(df_detailed, df_query_output)
+ df.write.mode("overwrite").saveAsTable("dq_obs_report_data")
+
+ print("success lets redesign the report")
+ df.show()
+ template_dir = os.path.join(os.path.dirname(__file__), 'templates')
+ env_loader = Environment(loader=FileSystemLoader(template_dir))
+ template = env_loader.get_template('advanced_email_alert_template.jinja')
+ df_data = [row.asDict() for row in df.collect()]
+ headers = list(df.columns)
+ rows = [row.asDict().values() for row in df.collect()]
+ print("df_data")
+ print(df_data)
+
+ html_output = template.render(
+ title='central_repo_test_table',
+ columns=headers,
+ table_rows=rows,
+ product_id='12345',
+ data_object_name='Sample Data Object',
+ snapshot_date='2023-10-01',
+ region_code='US',
+ dag_name='Sample DAG',
+ run_id='run_12345',
+ overall_status='Pass',
+ overall_status_bgcolor='#00FF00',
+ total_rules_executed=10,
+ total_passed_rules=9,
+ total_failed_rules=1,
+ competency_metrics_slack=[],
+ competency_metrics=[],
+ criticality_metrics=[]
+ )
+ print("calling the send mail to the users")
+ # self.send_mail(html_output, "test", "aaaalfyofqi7i7nxuvxlboxbym@nike.org.slack.com")
+
+ print("print the html data")
+ print(html_output)
+
+ except Exception as e:
+ print(f"Error in get_report_data: {e}")
+ traceback.print_exc()
\ No newline at end of file
diff --git a/spark_expectations/utils/report.py b/spark_expectations/utils/report.py
new file mode 100644
index 0000000..fcc71c4
--- /dev/null
+++ b/spark_expectations/utils/report.py
@@ -0,0 +1,90 @@
+
+from dataclasses import dataclass
+from typing import Dict, Optional, Tuple, List
+from datetime import datetime, timezone
+from pyspark.sql import DataFrame
+from pyspark.sql.functions import (
+ lit,
+ expr,
+ when,
+ array,
+ to_timestamp,
+ round as sql_round,
+ create_map,
+ explode,
+ to_json,
+ col,
+ split,
+ current_date,
+ get_json_object
+)
+from pyspark.sql.types import StructType
+from spark_expectations import _log
+from spark_expectations.core.exceptions import (
+ SparkExpectationsUserInputOrConfigInvalidException,
+ SparkExpectationsMiscException,
+)
+from spark_expectations.secrets import SparkExpectationsSecretsBackend
+from spark_expectations.utils.udf import remove_empty_maps
+from spark_expectations.core.context import SparkExpectationsContext
+from spark_expectations.sinks import _sink_hook
+from spark_expectations.config.user_config import Constants as user_config
+
+@dataclass
+class SparkExpectationsReport:
+ """
+ This class implements/supports writing data into the sink system
+ """
+
+ _context: SparkExpectationsContext
+
+ def __post_init__(self) -> None:
+ self.spark = self._context.spark
+
+ def dq_obs_report_data_insert(self, df_detailed: DataFrame, df_query_output: DataFrame):
+ try:
+ print("dq_obs_report_data_insert method called stats_detailed table")
+ # df_detailed.show(truncate=False)
+ df = df_detailed
+ # List of columns to be removed
+ columns_to_remove = [
+ "target_dq_status",
+ "source_expectations",
+ "source_dq_actual_outcome",
+ "source_dq_expected_outcome",
+ "source_dq_start_time",
+ "source_dq_end_time",
+ "target_expectations",
+ "target_dq_actual_outcome",
+ "target_dq_expected_outcome",
+ "target_dq_actual_row_count",
+ "target_dq_error_row_count",
+ "target_dq_row_count",
+ "target_dq_start_time",
+ "target_dq_end_time",
+ "dq_job_metadata_info"
+ ]
+ # Rename the columns
+ df = df.withColumnRenamed("source_dq_row_count", "total_records") \
+ .withColumnRenamed("source_dq_error_row_count", "failed_records") \
+ .withColumnRenamed("source_dq_actual_row_count", "valid_records")
+ df = df.withColumn("dag_name", get_json_object(col("dq_job_metadata_info"), "$.job")) \
+ .withColumn("Region_cd", get_json_object(col("dq_job_metadata_info"), "$.Region")) \
+ .withColumn("env", get_json_object(col("dq_job_metadata_info"), "$.env"))
+
+ # Calculate the success percentage and add it as a new column
+ df = df.withColumn("success_percentage", (col("valid_records") / col("total_records")) * 100)
+
+ # Create a new DataFrame by dropping the specified columns
+ print("This is the table ")
+ new_df = df.drop(*columns_to_remove)
+ # Save the DataFrame to a table
+ new_df.write.mode("overwrite").saveAsTable("dq_obs_report_data")
+ return new_df
+ except Exception as e:
+ raise SparkExpectationsMiscException(
+ f"An error occurred in dq_obs_report_data_insert: {e}"
+ )
+
+
+
diff --git a/spark_expectations/utils/templates/advanced_email_alert_template.jinja b/spark_expectations/utils/templates/advanced_email_alert_template.jinja
new file mode 100644
index 0000000..dfc2258
--- /dev/null
+++ b/spark_expectations/utils/templates/advanced_email_alert_template.jinja
@@ -0,0 +1,228 @@
+
+
+
+
+
+
+ {{ title }}
+
+
+
+
+
+
+ {%if communication_engine == 'outlook' %}
+
+ {% else %}
+
+ {% endif %}
+
+
+
+ {% if persona == 'engineer' %}
+ Summary of Data quality execution for the product id : {{product_id}}
+ {%else%}
+ Summary of Data quality execution for the data product {{data_object_name}}
+ {%endif%}
+ Snapshot Date : {{ snapshot_date }}
+ Region Code : {{ region_code }}
+ {% if persona == 'engineer' %}
+ DAG Name : {{ dag_name }}
+ run_id : {{ run_id }}
+ {% endif %}
+
+ |
+ {% if (communication_engine == 'outlook') %}
+
+ Overall Status
+
+ DQ Rules(Pass/Total) - {{ total_passed_rules }} / {{ total_rules_executed }}
+ {% else %}
+ |
+ Overall Status
+ {{overall_status}}
+ Total DQ Rules : {{ total_rules_executed }}
+ Passed : {{ total_passed_rules }} / Failed : {{ total_failed_rules }}
+ |
+ {% endif %}
+
+
+
+
+ {% if persona == 'engineer' %}
+
+ {% if communication_engine == 'outlook' %}
+
+ {% else %}
+
+ {% endif %}
+
+
+
+
+
+ Summary of Data Quality Competency
+
+ {% if communication_engine == 'outlook' %}
+
+ {% for metric in competency_metrics %}
+
+
+ {{ metric.name }} {{ metric.description }}
+ |
+ {% endfor %}
+
+ {% else %}
+
+ {% for criticality in competency_metrics_slack %}
+
+ {{ criticality.value }} {{ criticality.label }}
+ |
+ {% endfor %}
+
+ {% endif %}
+
+ |
+
+ Summary of TC Criticality
+
+
+ {% for criticality in criticality_metrics|sort(attribute='sort_number') %}
+
+ {{ criticality.value }} Criticality {{ criticality.label }}
+ |
+ {% endfor %}
+
+
+ |
+
+
+ {% endif %}
+
+
+ {% if communication_engine == 'outlook' %}
+
+ {% else %}
+
+ {% endif %}
+
+
+ {% for column in columns %}
+ {{ column.replace('-', ' ').replace('_', ' ').title() }} |
+ {% endfor %}
+
+
+
+ {% for row in table_rows %}
+
+ {% for cell in row %}
+ {% if cell == 'fail' or cell == 'FAIL' %}
+ {{ cell }} |
+ {% else %}
+ {{ cell }} |
+ {% endif %}
+ {% endfor %}
+
+ {% endfor %}
+
+
+
+
+
diff --git a/tests/sinks/utils/test_writer.py b/tests/sinks/utils/test_writer.py
index 7c8b6fb..8462b94 100644
--- a/tests/sinks/utils/test_writer.py
+++ b/tests/sinks/utils/test_writer.py
@@ -1,4 +1,5 @@
+
import os
import unittest.mock
from datetime import datetime
@@ -26,6 +27,7 @@ def fixture_mock_context():
mock_object.get_dq_expectations = {
"rule": "table_row_count_gt_1",
+ "column_name": "col1",
"description": "table count should be greater than 1",
"rule_type": "query_dq",
"tag": "validity",
@@ -423,6 +425,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -443,6 +446,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -463,6 +467,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -483,6 +488,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -502,6 +508,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_source_dq",
{
@@ -520,6 +527,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_source_dq",
{
@@ -539,6 +547,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_target_dq",
{
@@ -557,6 +566,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_target_dq",
{
@@ -773,6 +783,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -969,6 +980,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -991,6 +1003,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -1011,6 +1024,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_target_dq",
{
@@ -1029,6 +1043,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_target_dq",
{
@@ -1229,6 +1244,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -1427,6 +1443,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -1448,6 +1465,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -1468,6 +1486,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -1487,6 +1506,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_source_dq",
{
@@ -1505,6 +1525,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_source_dq",
{
@@ -1524,6 +1545,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_target_dq",
{
@@ -1542,6 +1564,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer):
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_target_dq",
{
@@ -2320,6 +2343,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"row_dq",
"sales_greater_than_zero",
+ "sales",
"sales > 2",
"accuracy",
"sales value should be greater than zero",
@@ -2341,6 +2365,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -2361,6 +2386,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -2381,6 +2407,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -2401,6 +2428,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -2420,6 +2448,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_source_dq",
{
@@ -2438,6 +2467,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_source_dq",
{
@@ -2457,6 +2487,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_target_dq",
{
@@ -2475,6 +2506,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_target_dq",
{
@@ -2493,6 +2525,7 @@ def test_write_error_stats(
"product_id": "product_1",
"table_name": "dq_spark_local.customer_order",
"rule": "sum_of_sales",
+ "column_name": "sales",
"rule_type": "agg_dq",
"source_expectations": "sum(sales)>10000",
"source_dq_status": "fail",
@@ -2574,6 +2607,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"row_dq",
"sales_greater_than_zero",
+ "sales",
"sales > 2",
"accuracy",
"sales value should be greater than zero",
@@ -2595,6 +2629,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -2615,6 +2650,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -2635,6 +2671,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -2655,6 +2692,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -2674,6 +2712,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_source_dq",
{
@@ -2692,6 +2731,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_source_dq",
{
@@ -2711,6 +2751,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_target_dq",
{
@@ -2729,6 +2770,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_target_dq",
{
@@ -2747,6 +2789,7 @@ def test_write_error_stats(
"product_id": "product1",
"table_name": "dq_spark_local.customer_order",
"rule": "sales_greater_than_zero",
+ "column_name": "sales",
"rule_type": "row_dq",
"source_expectations": "sales > 2",
"source_dq_status": "fail",
@@ -2805,6 +2848,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -2825,6 +2869,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_source_dq",
{
@@ -2843,6 +2888,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_source_dq",
{
@@ -2862,6 +2908,7 @@ def test_write_error_stats(
"product_id": "product_1",
"table_name": "dq_spark_local.customer_order",
"rule": "product_missing_count_threshold",
+ "column_name": "product_id",
"rule_type": "query_dq",
"source_expectations": "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"source_dq_status": "pass",
@@ -2942,6 +2989,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"row_dq",
"sales_greater_than_zero",
+ "Sales",
"sales > 2",
"accuracy",
"sales value should be greater than zero",
@@ -2961,6 +3009,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -2979,6 +3028,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"agg_dq",
"sum_of_sales",
+ "sales",
"sum(sales)>10000",
"validity",
"regex format validation for quantity",
@@ -2997,6 +3047,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -3017,6 +3068,7 @@ def test_write_error_stats(
"dq_spark_local.customer_order",
"query_dq",
"product_missing_count_threshold",
+ "product_id",
"((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"validity",
"row count threshold",
@@ -3036,6 +3088,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_source_dq",
{
@@ -3054,6 +3107,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_source_dq",
{
@@ -3073,6 +3127,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"source_f1",
"_target_dq",
{
@@ -3091,6 +3146,7 @@ def test_write_error_stats(
"your_product",
"dq_spark_local.customer_order",
"product_missing_count_threshold",
+ "product_id",
"target_f1",
"_target_dq",
{
@@ -3109,6 +3165,7 @@ def test_write_error_stats(
"product_id": "product_1",
"table_name": "dq_spark_local.customer_order",
"rule": "product_missing_count_threshold",
+ "column_name": "product_id",
"rule_type": "query_dq",
"source_expectations": "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3",
"source_dq_status": "pass",
diff --git a/tests/utils/test_actions.py b/tests/utils/test_actions.py
index 402396b..79ab100 100644
--- a/tests/utils/test_actions.py
+++ b/tests/utils/test_actions.py
@@ -14,6 +14,7 @@
spark = get_spark_session()
+
@pytest.fixture(name="_fixture_df")
def fixture_df():
# Create a sample input dataframe
@@ -24,7 +25,7 @@ def fixture_df():
{"row_id": 2, "col1": 3, "col2": "c"},
]
)
-
+
return _fixture_df
@@ -33,7 +34,7 @@ def fixture_mock_context():
# fixture for mock context
mock_object = Mock(spec=SparkExpectationsContext)
mock_object.product_id = "product1"
- mock_object.spark=spark
+ mock_object.spark = spark
mock_object.get_row_dq_rule_type_name = "row_dq"
mock_object.get_agg_dq_rule_type_name = "agg_dq"
mock_object.get_query_dq_rule_type_name = "query_dq"
@@ -41,21 +42,22 @@ def fixture_mock_context():
mock_object.get_query_dq_detailed_stats_status = True
mock_object.get_querydq_secondary_queries = {
- 'product_1|test_table|table_row_count_gt_1' :
-
- {
- 'source_f1': 'select count(*) from query_test_table','target_f1': 'select count(*) from query_test_table_target'
- },
+ 'product_1|test_table|table_row_count_gt_1':
- 'product_1|test_table|table_distinct_count' :
-
- {
- 'source_f1': 'select distinct col1, col2 from query_test_table','target_f1': 'elect distinct col1, col2 from query_test_table_target'
- }
+ {
+ 'source_f1': 'select count(*) from query_test_table',
+ 'target_f1': 'select count(*) from query_test_table_target'
+ },
+ 'product_1|test_table|table_distinct_count':
+ {
+ 'source_f1': 'select distinct col1, col2 from query_test_table',
+ 'target_f1': 'elect distinct col1, col2 from query_test_table_target'
}
+ }
+
mock_object.get_supported_df_query_dq = spark.createDataFrame(
[
{
@@ -71,7 +73,7 @@ def fixture_mock_context_without_detailed_stats():
# fixture for mock context without_detailed_stats
mock_object = Mock(spec=SparkExpectationsContext)
mock_object.product_id = "product1"
- mock_object.spark=spark
+ mock_object.spark = spark
mock_object.get_row_dq_rule_type_name = "row_dq"
mock_object.get_agg_dq_rule_type_name = "agg_dq"
mock_object.get_query_dq_rule_type_name = "query_dq"
@@ -87,57 +89,59 @@ def fixture_mock_context_without_detailed_stats():
return mock_object
-
@pytest.fixture(name="_fixture_agg_dq_rule")
def fixture_agg_dq_rule():
# Define the expectations for the data quality rules
return {
- "rule_type": "agg_dq",
- "rule": "col1_sum_gt_eq_6",
- "expectation": "sum(col1)>=6",
- "action_if_failed": "ignore",
- "table_name": "test_table",
- "tag": "validity",
- "enable_for_source_dq_validation": True,
- "description": "col1 sum gt 1",
- "product_id": "product_1"
- }
+ "rule_type": "agg_dq",
+ "rule": "col1_sum_gt_eq_6",
+ "column_name": "col1",
+ "expectation": "sum(col1)>=6",
+ "action_if_failed": "ignore",
+ "table_name": "test_table",
+ "tag": "validity",
+ "enable_for_source_dq_validation": True,
+ "description": "col1 sum gt 1",
+ "product_id": "product_1"
+ }
@pytest.fixture(name="_fixture_agg_dq_rule_type_range")
def _fixture_agg_dq_rule_type_range():
# Define the expectations for the data quality rules
- return {
- "rule_type": "agg_dq",
- "rule": "col1_sum_gt_6_and_lt_10",
- "expectation": "sum(col1)>6 and sum(col1)<10",
- "action_if_failed": "ignore",
- "table_name": "test_table",
- "tag": "validity",
- "enable_for_source_dq_validation": True,
- "description": "sum of col1 is greater than 6 and sum of col1 is less than 10",
- "product_id": "product_1"
- }
-
+ return {
+ "rule_type": "agg_dq",
+ "rule": "col1_sum_gt_6_and_lt_10",
+ "column_name": "col1",
+ "expectation": "sum(col1)>6 and sum(col1)<10",
+ "action_if_failed": "ignore",
+ "table_name": "test_table",
+ "tag": "validity",
+ "enable_for_source_dq_validation": True,
+ "description": "sum of col1 is greater than 6 and sum of col1 is less than 10",
+ "product_id": "product_1"
+ }
+
@pytest.fixture(name="_fixture_query_dq_rule")
def fixture_query_dq_rule():
# Define the expectations for the data quality rules
return {
- "product_id": "product_1",
- "rule_type": "query_dq",
- "rule": "table_row_count_gt_1",
- "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1",
- "action_if_failed": "ignore",
- "table_name": "test_table",
- "tag": "validity",
- "enable_for_target_dq_validation": True,
- "enable_for_source_dq_validation": True,
- "enable_querydq_custom_output": True,
- "expectation_source_f1": "select count(*) from query_test_table",
- "expectation_target_f1": "select count(*) from query_test_table_target",
- "description": "table count should be greater than 1"
- }
+ "product_id": "product_1",
+ "rule_type": "query_dq",
+ "rule": "table_row_count_gt_1",
+ "column_name": "col1",
+ "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1",
+ "action_if_failed": "ignore",
+ "table_name": "test_table",
+ "tag": "validity",
+ "enable_for_target_dq_validation": True,
+ "enable_for_source_dq_validation": True,
+ "enable_querydq_custom_output": True,
+ "expectation_source_f1": "select count(*) from query_test_table",
+ "expectation_target_f1": "select count(*) from query_test_table_target",
+ "description": "table count should be greater than 1"
+ }
@pytest.fixture(name="_fixture_expectations")
@@ -149,6 +153,7 @@ def fixture_expectations():
"product_id": "product_1",
"rule_type": "row_dq",
"rule": "col1_gt_eq_1",
+ "column_name": "col1",
"expectation": "col1 >=1",
"action_if_failed": "ignore",
"table_name": "test_table",
@@ -159,6 +164,7 @@ def fixture_expectations():
"product_id": "product_1",
"rule_type": "row_dq",
"rule": "col1_gt_eq_2",
+ "column_name": "col1",
"expectation": "col1 >= 2",
"action_if_failed": "drop",
"table_name": "test_table",
@@ -169,6 +175,7 @@ def fixture_expectations():
"product_id": "product_1",
"rule_type": "row_dq",
"rule": "col1_gt_eq_3",
+ "column_name": "col1",
"expectation": "col1 >= 3",
"action_if_failed": "fail",
"table_name": "test_table",
@@ -181,6 +188,7 @@ def fixture_expectations():
"product_id": "product_1",
"rule_type": "agg_dq",
"rule": "col1_sum_gt_eq_6",
+ "column_name": "col1",
"expectation": "sum(col1)>=6",
"action_if_failed": "ignore",
"table_name": "test_table",
@@ -193,6 +201,7 @@ def fixture_expectations():
"product_id": "product_1",
"rule_type": "agg_dq",
"rule": "col2_unique_value_gt_3",
+ "column_name": "col1",
"expectation": "count(distinct col2)>3",
"action_if_failed": "fail",
"table_name": "test_table",
@@ -205,6 +214,7 @@ def fixture_expectations():
"product_id": "product_1",
"rule_type": "agg_dq",
"rule": "col1_sum_gt_6_and_lt_10",
+ "column_name": "col1",
"expectation": "sum(col1)>6 and sum(col1)<10",
"action_if_failed": "fail",
"table_name": "test_table",
@@ -219,6 +229,7 @@ def fixture_expectations():
"product_id": "product_1",
"rule_type": "query_dq",
"rule": "table_row_count_gt_1",
+ "column_name": "col1",
"expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1",
"enable_querydq_custom_output": True,
"action_if_failed": "ignore",
@@ -234,6 +245,7 @@ def fixture_expectations():
"product_id": "product_1",
"rule_type": "query_dq",
"rule": "table_distinct_count",
+ "column_name": "col1",
"expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>3",
"enable_querydq_custom_output": False,
"action_if_failed": "fail",
@@ -254,66 +266,67 @@ def fixture_expectations():
def fixture_agg_dq_detailed_expected_result():
# define the expected result for row dq operations
return {
- "result":
+ "result":
{
"product_id": "product_1",
"table_name": "test_table",
"rule_type": "agg_dq",
"rule": "col1_sum_gt_eq_6",
+ "column_name": "col1",
"expectation": "sum(col1)>=6",
"tag": "validity",
"status": "pass",
"description": "col1 sum gt 1",
- "actual_value" : 6,
- "expected_value" : '>=6'
-
+ "actual_value": 6,
+ "expected_value": '>=6'
+
},
- "result_query_dq":
+ "result_query_dq":
{
"product_id": "product_1",
"table_name": "test_table",
"rule_type": "query_dq",
"rule": "table_row_count_gt_1",
+ "column_name": "col1",
"expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1",
"tag": "validity",
"status": "fail",
"description": "table count should be greater than 1",
- "actual_value" : 0,
- "expected_value" : '>1'
-
+ "actual_value": 0,
+ "expected_value": '>1'
+
},
- "result_without_context":
+ "result_without_context":
{
"product_id": "product_1",
"table_name": "test_table",
"rule_type": "agg_dq",
"rule": "col1_sum_gt_eq_6",
+ "column_name": "col1",
"expectation": "sum(col1)>=6",
"tag": "validity",
"status": None,
"description": "col1 sum gt 1",
- "actual_value" : None,
- "expected_value" : None
-
+ "actual_value": None,
+ "expected_value": None
+
},
- "result_without_context1":
+ "result_without_context1":
{
"product_id": "product_1",
"table_name": "test_table",
"rule_type": "agg_dq",
- "rule":"col1_sum_gt_6_and_lt_10",
+ "rule": "col1_sum_gt_6_and_lt_10",
+ "column_name": "col1",
"expectation": "sum(col1)>6 and sum(col1)<10",
"tag": "validity",
"status": "fail",
"description": "sum of col1 is greater than 6 and sum of col1 is less than 10",
- "actual_value" : 6,
- "expected_value" : '6>6 and 6<10'
-
- }
- }
-
-
+ "actual_value": 6,
+ "expected_value": '6>6 and 6<10'
+ }
+ }
@pytest.fixture(name="_fixture_row_dq_expected_result")
@@ -370,13 +383,13 @@ def fixture_agg_dq_expected_result():
"tag": "accuracy",
"description": "col2 unique value grater than 3"
},
- {
- "rule_type": "agg_dq",
- "rule": "col1_sum_gt_6_and_lt_10",
- "action_if_failed": "fail",
- "tag": "accuracy",
- "description": "sum of col1 value grater than 6 and less than 10"
- }
+ {
+ "rule_type": "agg_dq",
+ "rule": "col1_sum_gt_6_and_lt_10",
+ "action_if_failed": "fail",
+ "tag": "accuracy",
+ "description": "sum of col1 value grater than 6 and less than 10"
+ }
]
}
@@ -387,22 +400,22 @@ def fixture_query_dq_expected_result():
return {
"result":
[
- {
- 'rule': 'table_row_count_gt_1',
- 'description': 'table count should be greater than 1',
- 'rule_type': 'query_dq',
- 'tag': 'validity',
- 'action_if_failed': 'ignore'
- },
- {
- 'rule': 'table_distinct_count',
- 'description': 'table distinct row count should be greater than 3',
- 'rule_type': 'query_dq',
- 'tag': 'accuracy',
- 'action_if_failed': 'fail'
- }
- ]
- }
+ {
+ 'rule': 'table_row_count_gt_1',
+ 'description': 'table count should be greater than 1',
+ 'rule_type': 'query_dq',
+ 'tag': 'validity',
+ 'action_if_failed': 'ignore'
+ },
+ {
+ 'rule': 'table_distinct_count',
+ 'description': 'table distinct row count should be greater than 3',
+ 'rule_type': 'query_dq',
+ 'tag': 'accuracy',
+ 'action_if_failed': 'fail'
+ }
+ ]
+ }
import pytest
@@ -475,140 +488,146 @@ def compare_result(_actual_output, _expected_output):
compare_result(actual_output, expected_output)
-
-
@pytest.mark.parametrize("_query_dq_rule, query_dq_detailed_expected_result, _source_dq_status,_target_dq_status", [
# expectations rule
({
- "product_id": "product_1",
- "rule_type": "query_dq",
- "rule": "table_row_count_gt_1",
- "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)",
- "enable_querydq_custom_output": True,
- "action_if_failed": "ignore",
- "table_name": "test_table",
- "tag": "validity",
- "enable_for_target_dq_validation": True,
- "enable_for_source_dq_validation": True,
- "description": "table count should be greater than 1",
- "expectation_source_f1": "select count(*) from query_test_table",
- "expectation_target_f1": "select count(*) from query_test_table_target"
- },
+ "product_id": "product_1",
+ "rule_type": "query_dq",
+ "rule": "table_row_count_gt_1",
+ "column_name": "col1",
+ "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)",
+ "enable_querydq_custom_output": True,
+ "action_if_failed": "ignore",
+ "table_name": "test_table",
+ "tag": "validity",
+ "enable_for_target_dq_validation": True,
+ "enable_for_source_dq_validation": True,
+ "description": "table count should be greater than 1",
+ "expectation_source_f1": "select count(*) from query_test_table",
+ "expectation_target_f1": "select count(*) from query_test_table_target"
+ },
# result in spark col object
{
- "product_id": "product_1",
- "table_name": "test_table",
- "rule_type": "query_dq",
- "rule": "table_row_count_gt_1",
- "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)",
- "tag": "validity",
- "status": "fail",
- "description": "table count should be greater than 1",
- "actual_value" : 0,
- "expected_value" : '>3'
- },True,False),
+ "product_id": "product_1",
+ "table_name": "test_table",
+ "rule_type": "query_dq",
+ "rule": "table_row_count_gt_1",
+ "column_name": "col1",
+ "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)",
+ "tag": "validity",
+ "status": "fail",
+ "description": "table count should be greater than 1",
+ "actual_value": 0,
+ "expected_value": '>3'
+ }, True, False),
# expectations rule
({
- "product_id": "product_1",
- "rule_type": "query_dq",
- "rule": "table_distinct_count",
- "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))",
- "enable_querydq_custom_output": False,
- "action_if_failed": "fail",
- "table_name": "test_table",
- "tag": "accuracy",
- "enable_for_target_dq_validation": True,
- "enable_for_source_dq_validation": True,
- "description": "table distinct row count should be greater than 3",
- "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)",
- "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)"
- },
+ "product_id": "product_1",
+ "rule_type": "query_dq",
+ "rule": "table_distinct_count",
+ "column_name": "col1",
+ "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))",
+ "enable_querydq_custom_output": False,
+ "action_if_failed": "fail",
+ "table_name": "test_table",
+ "tag": "accuracy",
+ "enable_for_target_dq_validation": True,
+ "enable_for_source_dq_validation": True,
+ "description": "table distinct row count should be greater than 3",
+ "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)",
+ "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)"
+ },
# result in spark col object
{
- "product_id": "product_1",
- "table_name": "test_table",
- "rule_type": "query_dq",
- "rule": "table_distinct_count",
- "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))",
- "tag": "accuracy",
- "status": "fail",
- "description": "table distinct row count should be greater than 3",
- "actual_value" : 0,
- "expected_value" : '>3'
- },False, True
- ),
+ "product_id": "product_1",
+ "table_name": "test_table",
+ "rule_type": "query_dq",
+ "rule": "table_distinct_count",
+ "column_name": "col1",
+ "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))",
+ "tag": "accuracy",
+ "status": "fail",
+ "description": "table distinct row count should be greater than 3",
+ "actual_value": 0,
+ "expected_value": '>3'
+ }, False, True
+ ),
])
def test_agg_query_dq_detailed_result_with_querdq_v2(_fixture_df,
- _query_dq_rule,
- query_dq_detailed_expected_result,
- _fixture_mock_context,_source_dq_status,_target_dq_status):
-
+ _query_dq_rule,
+ query_dq_detailed_expected_result,
+ _fixture_mock_context, _source_dq_status, _target_dq_status):
_fixture_df.createOrReplaceTempView("query_test_table")
_fixture_df.createOrReplaceTempView("query_test_table_target")
- result_out,result_output = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _query_dq_rule,_fixture_df,[],_source_dq_status=_source_dq_status,_target_dq_status=_target_dq_status
- )
- print("result_df:",result_output)
- print("query_dq_detailed_expected_result:",query_dq_detailed_expected_result)
-
+ result_out, result_output = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context,
+ _query_dq_rule, _fixture_df, [],
+ _source_dq_status=_source_dq_status,
+ _target_dq_status=_target_dq_status
+ )
+ print("result_df:", result_output)
+ print("query_dq_detailed_expected_result:", query_dq_detailed_expected_result)
assert result_output[1] == query_dq_detailed_expected_result.get("product_id")
assert result_output[2] == query_dq_detailed_expected_result.get("table_name")
assert result_output[3] == query_dq_detailed_expected_result.get("rule_type")
assert result_output[4] == query_dq_detailed_expected_result.get("rule")
- assert result_output[5] == query_dq_detailed_expected_result.get("expectation")
- assert result_output[6] == query_dq_detailed_expected_result.get("tag")
- assert result_output[7] == query_dq_detailed_expected_result.get("description")
- assert result_output[8] == query_dq_detailed_expected_result.get("status")
-
- assert result_output[9] == query_dq_detailed_expected_result.get("actual_value")
- assert result_output[10] == query_dq_detailed_expected_result.get("expected_value")
+ assert result_output[5] == query_dq_detailed_expected_result.get("column_name")
+ assert result_output[6] == query_dq_detailed_expected_result.get("expectation")
+ assert result_output[7] == query_dq_detailed_expected_result.get("tag")
+ assert result_output[8] == query_dq_detailed_expected_result.get("description")
+ assert result_output[9] == query_dq_detailed_expected_result.get("status")
+ assert result_output[10] == query_dq_detailed_expected_result.get("actual_value")
+ assert result_output[11] == query_dq_detailed_expected_result.get("expected_value")
@pytest.mark.parametrize("_query_dq_rule_exception", [
# expectations rule
({
- "product_id": "product_1",
- "rule_type": "query_dq",
- "rule": "table_row_count_gt_1",
- "expectation": "(select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)",
- "enable_querydq_custom_output": True,
- "action_if_failed": "ignore",
- "table_name": "test_table",
- "tag": "validity",
- "enable_for_target_dq_validation": True,
- "description": "table count should be greater than 1",
- "expectation_source_f1": "select count(*) from query_test_table",
- "expectation_target_f1": "select count(*) from query_test_table_target"
+ "product_id": "product_1",
+ "rule_type": "query_dq",
+ "rule": "table_row_count_gt_1",
+ "column_name": "col1",
+ "expectation": "(select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)",
+ "enable_querydq_custom_output": True,
+ "action_if_failed": "ignore",
+ "table_name": "test_table",
+ "tag": "validity",
+ "enable_for_target_dq_validation": True,
+ "description": "table count should be greater than 1",
+ "expectation_source_f1": "select count(*) from query_test_table",
+ "expectation_target_f1": "select count(*) from query_test_table_target"
}
),
# expectations rule
({
- "product_id": "product_1",
- "rule_type": "query_dq",
- "rule": "table_distinct_count",
- "expectation": "(select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))",
- "enable_querydq_custom_output": False,
- "action_if_failed": "fail",
- "table_name": "test_table",
- "tag": "accuracy",
- "enable_for_target_dq_validation": True,
- "enable_for_source_dq_validation": True,
- "description": "table distinct row count should be greater than 3",
- "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)",
- "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)"
+ "product_id": "product_1",
+ "rule_type": "query_dq",
+ "rule": "table_distinct_count",
+ "column_name": "col1",
+ "expectation": "(select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))",
+ "enable_querydq_custom_output": False,
+ "action_if_failed": "fail",
+ "table_name": "test_table",
+ "tag": "accuracy",
+ "enable_for_target_dq_validation": True,
+ "enable_for_source_dq_validation": True,
+ "description": "table distinct row count should be greater than 3",
+ "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)",
+ "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)"
}
- ),
-
+ ),
+
])
def test_agg_query_dq_detailed_result_exception_v2(_fixture_df,
- _query_dq_rule_exception,_fixture_mock_context):
+ _query_dq_rule_exception, _fixture_mock_context):
# faulty user input is given to test the exception functionality of the agg_query_dq_detailed_result
_fixture_df.createOrReplaceTempView("query_test_table")
_fixture_df.createOrReplaceTempView("query_test_table_target")
with pytest.raises(SparkExpectationsMiscException,
match=r"(error occurred while running agg_query_dq_detailed_result Sql query is invalid. *)|(error occurred while running agg_query_dq_detailed_result Regex match not found. *)"):
- SparkExpectationsActions().agg_query_dq_detailed_result(_fixture_mock_context, _query_dq_rule_exception,_fixture_df,[] )
+ SparkExpectationsActions().agg_query_dq_detailed_result(_fixture_mock_context, _query_dq_rule_exception,
+ _fixture_df, [])
@pytest.mark.parametrize("input_df, rule_type_name, expected_output",
@@ -646,7 +665,8 @@ def test_create_agg_dq_results(input_df,
rule_type_name,
expected_output, _fixture_mock_context):
# unit test case on create_agg_dq_results
- assert SparkExpectationsActions().create_agg_dq_results(_fixture_mock_context,input_df, rule_type_name, ) == expected_output
+ assert SparkExpectationsActions().create_agg_dq_results(_fixture_mock_context, input_df,
+ rule_type_name, ) == expected_output
@pytest.mark.parametrize("input_df",
@@ -667,100 +687,107 @@ def test_create_agg_dq_results_exception(input_df,
def test_agg_query_dq_detailed_result_exception(_fixture_df,
- _fixture_query_dq_rule):
+ _fixture_query_dq_rule):
_mock_object_context = Mock(spec=SparkExpectationsContext)
# faulty user input is given to test the exception functionality of the agg_query_dq_detailed_result
-
+
with pytest.raises(SparkExpectationsMiscException,
match=r"error occurred while running agg_query_dq_detailed_result .*"):
- SparkExpectationsActions().agg_query_dq_detailed_result(_mock_object_context, "_fixture_query_dq_rule","",[] )
+ SparkExpectationsActions().agg_query_dq_detailed_result(_mock_object_context, "_fixture_query_dq_rule", "",
+ [])
def test_agg_query_dq_detailed_result(_fixture_df,
- _fixture_agg_dq_rule,
- _fixture_agg_dq_detailed_expected_result,
- _fixture_mock_context):
- result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _fixture_agg_dq_rule,_fixture_df,[]
- )
-
-
+ _fixture_agg_dq_rule,
+ _fixture_agg_dq_detailed_expected_result,
+ _fixture_mock_context):
+ result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context,
+ _fixture_agg_dq_rule, _fixture_df, []
+ )
+
assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result").get("product_id")
assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result").get("table_name")
assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result").get("rule_type")
assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result").get("rule")
- assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result").get("expectation")
- assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result").get("tag")
- assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result").get("description")
- assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result").get("status")
-
- assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result").get("actual_value")
- assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result").get("expected_value")
+ assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result").get("column_name")
+ assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result").get("expectation")
+ assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result").get("tag")
+ assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result").get("description")
+ assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result").get("status")
+
+ assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result").get("actual_value")
+ assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result").get("expected_value")
def test_agg_query_dq_detailed_result_with_range_rule_type(_fixture_df,
- _fixture_agg_dq_rule_type_range,
- _fixture_agg_dq_detailed_expected_result,
- _fixture_mock_context):
- result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _fixture_agg_dq_rule_type_range,_fixture_df,[]
- )
-
+ _fixture_agg_dq_rule_type_range,
+ _fixture_agg_dq_detailed_expected_result,
+ _fixture_mock_context):
+ result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context,
+ _fixture_agg_dq_rule_type_range,
+ _fixture_df, []
+ )
+
assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("product_id")
assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("table_name")
assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("rule_type")
assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("rule")
- assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("expectation")
- assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("tag")
- assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("description")
- assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("status")
-
- assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("actual_value")
- assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("expected_value")
+ assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("column_name")
+ assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("expectation")
+ assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("tag")
+ assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("description")
+ assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("status")
+
+ assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("actual_value")
+ assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get(
+ "expected_value")
def test_agg_query_dq_detailed_result_with_querdq(_fixture_df,
- _fixture_query_dq_rule,
- _fixture_agg_dq_detailed_expected_result,
- _fixture_mock_context):
-
+ _fixture_query_dq_rule,
+ _fixture_agg_dq_detailed_expected_result,
+ _fixture_mock_context):
_fixture_df.createOrReplaceTempView("query_test_table")
_fixture_df.createOrReplaceTempView("query_test_table_target")
- result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _fixture_query_dq_rule,_fixture_df,[]
- )
-
+ result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context,
+ _fixture_query_dq_rule, _fixture_df,
+ []
+ )
+
assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("product_id")
assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("table_name")
assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("rule_type")
assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("rule")
- assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expectation")
- assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("tag")
- assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("description")
- assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("status")
-
- assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("actual_value")
- assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expected_value")
+ assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("column_name")
+ assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expectation")
+ assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("tag")
+ assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("description")
+ assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("status")
+
+ assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("actual_value")
+ assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expected_value")
def test_agg_query_dq_detailed_result_without_detailed_context(_fixture_df,
- _fixture_agg_dq_rule,
- _fixture_agg_dq_detailed_expected_result,
- _fixture_mock_context_without_detailed_stats):
- result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context_without_detailed_stats, _fixture_agg_dq_rule,_fixture_df,[]
- )
-
+ _fixture_agg_dq_rule,
+ _fixture_agg_dq_detailed_expected_result,
+ _fixture_mock_context_without_detailed_stats):
+ result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(
+ _fixture_mock_context_without_detailed_stats, _fixture_agg_dq_rule, _fixture_df, []
+ )
-
assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("product_id")
assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("table_name")
assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("rule_type")
assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("rule")
- assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expectation")
- assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("tag")
- assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("description")
- assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("status")
-
- assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("actual_value")
- assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expected_value")
+ assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("column_name")
+ assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expectation")
+ assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("tag")
+ assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("description")
+ assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("status")
+ assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("actual_value")
+ assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expected_value")
def test_run_dq_rules_row(_fixture_df,
@@ -791,14 +818,14 @@ def test_run_dq_rules_row(_fixture_df,
(True, False),
(False, True),
])
-
def test_run_dq_rules_agg(_fixture_df,
_fixture_expectations,
_fixture_agg_dq_expected_result,
- _fixture_mock_context,agg_dq_source_dq_status,agg_dq_target_dq_status):
+ _fixture_mock_context, agg_dq_source_dq_status, agg_dq_target_dq_status):
# Apply the data quality rules
- result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations,"agg_dq",agg_dq_source_dq_status,agg_dq_target_dq_status)
+ result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations,
+ "agg_dq", agg_dq_source_dq_status, agg_dq_target_dq_status)
# Assert that the result dataframe has the expected number of columns
assert len(result_df.columns) == 1
@@ -810,8 +837,6 @@ def test_run_dq_rules_agg(_fixture_df,
assert row.meta_agg_dq_results == _fixture_agg_dq_expected_result.get("result")
-
-
@pytest.mark.parametrize("query_dq_source_dq_status,query_dq_target_dq_status", [
(True, False),
(False, True),
@@ -819,12 +844,13 @@ def test_run_dq_rules_agg(_fixture_df,
def test_run_dq_rules_query(_fixture_df,
_fixture_expectations,
_fixture_query_dq_expected_result,
- _fixture_mock_context,query_dq_source_dq_status,query_dq_target_dq_status):
+ _fixture_mock_context, query_dq_source_dq_status, query_dq_target_dq_status):
# Apply the data quality rules
_fixture_df.createOrReplaceTempView("query_test_table")
_fixture_df.createOrReplaceTempView("query_test_table_target")
-
- result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations,"query_dq",query_dq_source_dq_status,query_dq_target_dq_status)
+
+ result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations,
+ "query_dq", query_dq_source_dq_status, query_dq_target_dq_status)
# Assert that the result dataframe has the expected number of columns
assert len(result_df.columns) == 1
@@ -1535,6 +1561,7 @@ def test_run_dq_rules_condition_expression_exception(_fixture_df,
{
"rule_type": "query_dq",
"rule": "table_row_count_gt_1",
+ "column_name": "col1",
"expectation": "(select count(*) from query_test_table)>1",
"action_if_failed": "ignore",
"table_name": "test_table",
@@ -1542,26 +1569,25 @@ def test_run_dq_rules_condition_expression_exception(_fixture_df,
"enable_for_target_dq_validation": False,
"description": "table count should be greater than 1"
},
-
- ],
- }
+
+ ],
+ }
_fixture_df.createOrReplaceTempView("query_test_table")
with pytest.raises(SparkExpectationsMiscException,
match=r"error occurred while running expectations .*"):
-
SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _expectations,
- "query_dq", False, True)
+ "query_dq", False, True)
@pytest.mark.parametrize("_rule_test", [
-
+
({"rule_type": "query"}),
-
+
])
def test_run_dq_rules_condition_expression_dynamic_exception(_fixture_df,
- _fixture_query_dq_expected_result,
- _fixture_mock_context,_rule_test):
+ _fixture_query_dq_expected_result,
+ _fixture_mock_context, _rule_test):
# Apply the data quality rules
_expectations = {"query_rules": [
{
@@ -1574,13 +1600,13 @@ def test_run_dq_rules_condition_expression_dynamic_exception(_fixture_df,
"enable_for_target_dq_validation": False,
"description": "table count should be greater than 1"
},
-
- ],
- }
+
+ ],
+ }
_fixture_df.createOrReplaceTempView("query_test_table")
with pytest.raises(SparkExpectationsMiscException,
match=r"error occurred while running expectations .*"):
- _rule_type= _rule_test.get("rule_type")
+ _rule_type = _rule_test.get("rule_type")
SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _expectations,
- _rule_type, False, True)
+ _rule_type, False, True)