From a33c3aa7c1d91b8bc0d1f06acc6947d7c858d099 Mon Sep 17 00:00:00 2001 From: Sudeepta pal <111543327+sudeep7978@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:29:46 +0530 Subject: [PATCH 1/9] adding additional column in the detailed table. Add Column for Column-Level Visibility in Data Quality Framework Result Table --- spark_expectations/sinks/utils/writer.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/spark_expectations/sinks/utils/writer.py b/spark_expectations/sinks/utils/writer.py index d31fbf5..7013364 100644 --- a/spark_expectations/sinks/utils/writer.py +++ b/spark_expectations/sinks/utils/writer.py @@ -135,6 +135,7 @@ def get_row_dq_detailed_stats( str, str, str, + str, None, None, int, @@ -176,6 +177,7 @@ def get_row_dq_detailed_stats( for _rowdq_rule in _row_dq_expectations: # if _rowdq_rule["rule"] in _dq_res: + failed_row_count = _dq_res[_rowdq_rule["rule"]] _row_dq_result.append( ( @@ -184,6 +186,7 @@ def get_row_dq_detailed_stats( _table_name, _rowdq_rule["rule_type"], _rowdq_rule["rule"], + _rowdq_rule["column_name"], _rowdq_rule["expectation"], _rowdq_rule["tag"], _rowdq_rule["description"], @@ -322,6 +325,7 @@ def _prep_secondary_query_output(self) -> DataFrame: "product_id", "table_name", "rule", + "column_name", "alias", "dq_type", "source_dq", @@ -354,7 +358,8 @@ def _prep_secondary_query_output(self) -> DataFrame: + "target.source_dq as target_output from _df_custom_detailed_stats_source as source " + "left outer join _df_custom_detailed_stats_source as target " + "on source.run_id=target.run_id and source.product_id=target.product_id and " - + "source.table_name=target.table_name and source.rule=target.rule " + + "source.table_name=target.table_name and source.rule=target.rule and " + + "source.column_name = target.column_name and source.dq_type = target.dq_type " + "and source.alias_comp=target.alias_comp " + "and source.compare = 'source' and target.compare = 'target' " ) @@ -395,6 +400,7 @@ def _prep_detailed_stats( "table_name", "rule_type", "rule", + "column_name", "source_expectations", "tag", "description", @@ -415,6 +421,7 @@ def _prep_detailed_stats( "table_name", "rule_type", "rule", + "column_name", "target_expectations", "tag", "description", @@ -483,7 +490,7 @@ def _prep_detailed_stats( _df_detailed_stats = _df_source_aggquery_detailed_stats.join( _df_target_aggquery_detailed_stats, - ["run_id", "product_id", "table_name", "rule_type", "rule"], + ["run_id", "product_id", "table_name", "rule_type", "rule", "column_name"], "full_outer", ) @@ -605,6 +612,7 @@ def write_error_stats(self) -> None: input_count: int = self._context.get_input_count error_count: int = self._context.get_error_count output_count: int = self._context.get_output_count + source_agg_dq_result: Optional[ List[Dict[str, str]] ] = self._context.get_source_agg_dq_result @@ -760,8 +768,6 @@ def write_error_stats(self) -> None: .withColumn("success_percentage", sql_round(df.success_percentage, 2)) .withColumn("error_percentage", sql_round(df.error_percentage, 2)) ) - - self._context.set_stats_dict(df) _log.info( "Writing metrics to the stats table: %s, started", self._context.get_dq_stats_table_name, From 5ee0376926ab6353c58e762f5527967745e61de0 Mon Sep 17 00:00:00 2001 From: Sudeepta pal <111543327+sudeep7978@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:31:59 +0530 Subject: [PATCH 2/9] add a new column in the result table of a data quality framework for column-level visibility. Add Column for Column-Level Visibility in Data Quality Framework Result Table --- spark_expectations/utils/actions.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/spark_expectations/utils/actions.py b/spark_expectations/utils/actions.py index 0509aa5..9a45327 100644 --- a/spark_expectations/utils/actions.py +++ b/spark_expectations/utils/actions.py @@ -129,6 +129,7 @@ def agg_query_dq_detailed_result( str, Any, str, + str, dict, str, ] @@ -328,20 +329,24 @@ def agg_query_dq_detailed_result( ) ): for _key, _querydq_query in sub_key_value.items(): - _querydq_df = _context.spark.sql(_dq_rule["expectation" + "_" + _key]) querydq_output.append( ( _context.get_run_id, _dq_rule["product_id"], _dq_rule["table_name"], _dq_rule["rule"], + _dq_rule["column_name"], _key, _query_prefix, dict( [ ( _key, - [row.asDict() for row in _querydq_df.collect()], + _context.spark.sql( + _dq_rule["expectation" + "_" + _key] + ) + .toJSON() + .collect(), ) ] ), @@ -421,6 +426,7 @@ def execute_sql_and_get_result( _dq_rule["table_name"], _dq_rule["rule_type"], _dq_rule["rule"], + _dq_rule["column_name"], _dq_rule["expectation"], _dq_rule["tag"], _dq_rule["description"], From cc9e5a0c7b07f6dd88dc3fff4a3369fc510bfa38 Mon Sep 17 00:00:00 2001 From: Sudeepta pal <111543327+sudeep7978@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:33:42 +0530 Subject: [PATCH 3/9] updated the test case for added column --- tests/sinks/utils/test_writer.py | 57 ++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/tests/sinks/utils/test_writer.py b/tests/sinks/utils/test_writer.py index 7c8b6fb..8462b94 100644 --- a/tests/sinks/utils/test_writer.py +++ b/tests/sinks/utils/test_writer.py @@ -1,4 +1,5 @@ + import os import unittest.mock from datetime import datetime @@ -26,6 +27,7 @@ def fixture_mock_context(): mock_object.get_dq_expectations = { "rule": "table_row_count_gt_1", + "column_name": "col1", "description": "table count should be greater than 1", "rule_type": "query_dq", "tag": "validity", @@ -423,6 +425,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -443,6 +446,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -463,6 +467,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -483,6 +488,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -502,6 +508,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_source_dq", { @@ -520,6 +527,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_source_dq", { @@ -539,6 +547,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_target_dq", { @@ -557,6 +566,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_target_dq", { @@ -773,6 +783,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -969,6 +980,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -991,6 +1003,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -1011,6 +1024,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_target_dq", { @@ -1029,6 +1043,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_target_dq", { @@ -1229,6 +1244,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -1427,6 +1443,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -1448,6 +1465,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -1468,6 +1486,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -1487,6 +1506,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_source_dq", { @@ -1505,6 +1525,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_source_dq", { @@ -1524,6 +1545,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_target_dq", { @@ -1542,6 +1564,7 @@ def test_get_row_dq_detailed_stats_exception(input_record, _fixture_writer): "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_target_dq", { @@ -2320,6 +2343,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "row_dq", "sales_greater_than_zero", + "sales", "sales > 2", "accuracy", "sales value should be greater than zero", @@ -2341,6 +2365,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -2361,6 +2386,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -2381,6 +2407,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -2401,6 +2428,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -2420,6 +2448,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_source_dq", { @@ -2438,6 +2467,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_source_dq", { @@ -2457,6 +2487,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_target_dq", { @@ -2475,6 +2506,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_target_dq", { @@ -2493,6 +2525,7 @@ def test_write_error_stats( "product_id": "product_1", "table_name": "dq_spark_local.customer_order", "rule": "sum_of_sales", + "column_name": "sales", "rule_type": "agg_dq", "source_expectations": "sum(sales)>10000", "source_dq_status": "fail", @@ -2574,6 +2607,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "row_dq", "sales_greater_than_zero", + "sales", "sales > 2", "accuracy", "sales value should be greater than zero", @@ -2595,6 +2629,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -2615,6 +2650,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -2635,6 +2671,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -2655,6 +2692,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -2674,6 +2712,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_source_dq", { @@ -2692,6 +2731,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_source_dq", { @@ -2711,6 +2751,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_target_dq", { @@ -2729,6 +2770,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_target_dq", { @@ -2747,6 +2789,7 @@ def test_write_error_stats( "product_id": "product1", "table_name": "dq_spark_local.customer_order", "rule": "sales_greater_than_zero", + "column_name": "sales", "rule_type": "row_dq", "source_expectations": "sales > 2", "source_dq_status": "fail", @@ -2805,6 +2848,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -2825,6 +2869,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_source_dq", { @@ -2843,6 +2888,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_source_dq", { @@ -2862,6 +2908,7 @@ def test_write_error_stats( "product_id": "product_1", "table_name": "dq_spark_local.customer_order", "rule": "product_missing_count_threshold", + "column_name": "product_id", "rule_type": "query_dq", "source_expectations": "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "source_dq_status": "pass", @@ -2942,6 +2989,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "row_dq", "sales_greater_than_zero", + "Sales", "sales > 2", "accuracy", "sales value should be greater than zero", @@ -2961,6 +3009,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -2979,6 +3028,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", + "sales", "sum(sales)>10000", "validity", "regex format validation for quantity", @@ -2997,6 +3047,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -3017,6 +3068,7 @@ def test_write_error_stats( "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", + "product_id", "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "validity", "row count threshold", @@ -3036,6 +3088,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_source_dq", { @@ -3054,6 +3107,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_source_dq", { @@ -3073,6 +3127,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "source_f1", "_target_dq", { @@ -3091,6 +3146,7 @@ def test_write_error_stats( "your_product", "dq_spark_local.customer_order", "product_missing_count_threshold", + "product_id", "target_f1", "_target_dq", { @@ -3109,6 +3165,7 @@ def test_write_error_stats( "product_id": "product_1", "table_name": "dq_spark_local.customer_order", "rule": "product_missing_count_threshold", + "column_name": "product_id", "rule_type": "query_dq", "source_expectations": "((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3", "source_dq_status": "pass", From 55fff867750f4657f6fd0851cfe653fcd59588e0 Mon Sep 17 00:00:00 2001 From: Sudeepta pal <111543327+sudeep7978@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:35:27 +0530 Subject: [PATCH 4/9] Update test case for added column name --- tests/utils/test_actions.py | 572 +++++++++++++++++++----------------- 1 file changed, 299 insertions(+), 273 deletions(-) diff --git a/tests/utils/test_actions.py b/tests/utils/test_actions.py index 402396b..79ab100 100644 --- a/tests/utils/test_actions.py +++ b/tests/utils/test_actions.py @@ -14,6 +14,7 @@ spark = get_spark_session() + @pytest.fixture(name="_fixture_df") def fixture_df(): # Create a sample input dataframe @@ -24,7 +25,7 @@ def fixture_df(): {"row_id": 2, "col1": 3, "col2": "c"}, ] ) - + return _fixture_df @@ -33,7 +34,7 @@ def fixture_mock_context(): # fixture for mock context mock_object = Mock(spec=SparkExpectationsContext) mock_object.product_id = "product1" - mock_object.spark=spark + mock_object.spark = spark mock_object.get_row_dq_rule_type_name = "row_dq" mock_object.get_agg_dq_rule_type_name = "agg_dq" mock_object.get_query_dq_rule_type_name = "query_dq" @@ -41,21 +42,22 @@ def fixture_mock_context(): mock_object.get_query_dq_detailed_stats_status = True mock_object.get_querydq_secondary_queries = { - 'product_1|test_table|table_row_count_gt_1' : - - { - 'source_f1': 'select count(*) from query_test_table','target_f1': 'select count(*) from query_test_table_target' - }, + 'product_1|test_table|table_row_count_gt_1': - 'product_1|test_table|table_distinct_count' : - - { - 'source_f1': 'select distinct col1, col2 from query_test_table','target_f1': 'elect distinct col1, col2 from query_test_table_target' - } + { + 'source_f1': 'select count(*) from query_test_table', + 'target_f1': 'select count(*) from query_test_table_target' + }, + 'product_1|test_table|table_distinct_count': + { + 'source_f1': 'select distinct col1, col2 from query_test_table', + 'target_f1': 'elect distinct col1, col2 from query_test_table_target' } + } + mock_object.get_supported_df_query_dq = spark.createDataFrame( [ { @@ -71,7 +73,7 @@ def fixture_mock_context_without_detailed_stats(): # fixture for mock context without_detailed_stats mock_object = Mock(spec=SparkExpectationsContext) mock_object.product_id = "product1" - mock_object.spark=spark + mock_object.spark = spark mock_object.get_row_dq_rule_type_name = "row_dq" mock_object.get_agg_dq_rule_type_name = "agg_dq" mock_object.get_query_dq_rule_type_name = "query_dq" @@ -87,57 +89,59 @@ def fixture_mock_context_without_detailed_stats(): return mock_object - @pytest.fixture(name="_fixture_agg_dq_rule") def fixture_agg_dq_rule(): # Define the expectations for the data quality rules return { - "rule_type": "agg_dq", - "rule": "col1_sum_gt_eq_6", - "expectation": "sum(col1)>=6", - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_source_dq_validation": True, - "description": "col1 sum gt 1", - "product_id": "product_1" - } + "rule_type": "agg_dq", + "rule": "col1_sum_gt_eq_6", + "column_name": "col1", + "expectation": "sum(col1)>=6", + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_source_dq_validation": True, + "description": "col1 sum gt 1", + "product_id": "product_1" + } @pytest.fixture(name="_fixture_agg_dq_rule_type_range") def _fixture_agg_dq_rule_type_range(): # Define the expectations for the data quality rules - return { - "rule_type": "agg_dq", - "rule": "col1_sum_gt_6_and_lt_10", - "expectation": "sum(col1)>6 and sum(col1)<10", - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_source_dq_validation": True, - "description": "sum of col1 is greater than 6 and sum of col1 is less than 10", - "product_id": "product_1" - } - + return { + "rule_type": "agg_dq", + "rule": "col1_sum_gt_6_and_lt_10", + "column_name": "col1", + "expectation": "sum(col1)>6 and sum(col1)<10", + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_source_dq_validation": True, + "description": "sum of col1 is greater than 6 and sum of col1 is less than 10", + "product_id": "product_1" + } + @pytest.fixture(name="_fixture_query_dq_rule") def fixture_query_dq_rule(): # Define the expectations for the data quality rules return { - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_row_count_gt_1", - "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1", - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_target_dq_validation": True, - "enable_for_source_dq_validation": True, - "enable_querydq_custom_output": True, - "expectation_source_f1": "select count(*) from query_test_table", - "expectation_target_f1": "select count(*) from query_test_table_target", - "description": "table count should be greater than 1" - } + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_row_count_gt_1", + "column_name": "col1", + "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1", + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_target_dq_validation": True, + "enable_for_source_dq_validation": True, + "enable_querydq_custom_output": True, + "expectation_source_f1": "select count(*) from query_test_table", + "expectation_target_f1": "select count(*) from query_test_table_target", + "description": "table count should be greater than 1" + } @pytest.fixture(name="_fixture_expectations") @@ -149,6 +153,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "row_dq", "rule": "col1_gt_eq_1", + "column_name": "col1", "expectation": "col1 >=1", "action_if_failed": "ignore", "table_name": "test_table", @@ -159,6 +164,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "row_dq", "rule": "col1_gt_eq_2", + "column_name": "col1", "expectation": "col1 >= 2", "action_if_failed": "drop", "table_name": "test_table", @@ -169,6 +175,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "row_dq", "rule": "col1_gt_eq_3", + "column_name": "col1", "expectation": "col1 >= 3", "action_if_failed": "fail", "table_name": "test_table", @@ -181,6 +188,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "agg_dq", "rule": "col1_sum_gt_eq_6", + "column_name": "col1", "expectation": "sum(col1)>=6", "action_if_failed": "ignore", "table_name": "test_table", @@ -193,6 +201,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "agg_dq", "rule": "col2_unique_value_gt_3", + "column_name": "col1", "expectation": "count(distinct col2)>3", "action_if_failed": "fail", "table_name": "test_table", @@ -205,6 +214,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "agg_dq", "rule": "col1_sum_gt_6_and_lt_10", + "column_name": "col1", "expectation": "sum(col1)>6 and sum(col1)<10", "action_if_failed": "fail", "table_name": "test_table", @@ -219,6 +229,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "query_dq", "rule": "table_row_count_gt_1", + "column_name": "col1", "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1", "enable_querydq_custom_output": True, "action_if_failed": "ignore", @@ -234,6 +245,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "query_dq", "rule": "table_distinct_count", + "column_name": "col1", "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>3", "enable_querydq_custom_output": False, "action_if_failed": "fail", @@ -254,66 +266,67 @@ def fixture_expectations(): def fixture_agg_dq_detailed_expected_result(): # define the expected result for row dq operations return { - "result": + "result": { "product_id": "product_1", "table_name": "test_table", "rule_type": "agg_dq", "rule": "col1_sum_gt_eq_6", + "column_name": "col1", "expectation": "sum(col1)>=6", "tag": "validity", "status": "pass", "description": "col1 sum gt 1", - "actual_value" : 6, - "expected_value" : '>=6' - + "actual_value": 6, + "expected_value": '>=6' + }, - "result_query_dq": + "result_query_dq": { "product_id": "product_1", "table_name": "test_table", "rule_type": "query_dq", "rule": "table_row_count_gt_1", + "column_name": "col1", "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1", "tag": "validity", "status": "fail", "description": "table count should be greater than 1", - "actual_value" : 0, - "expected_value" : '>1' - + "actual_value": 0, + "expected_value": '>1' + }, - "result_without_context": + "result_without_context": { "product_id": "product_1", "table_name": "test_table", "rule_type": "agg_dq", "rule": "col1_sum_gt_eq_6", + "column_name": "col1", "expectation": "sum(col1)>=6", "tag": "validity", "status": None, "description": "col1 sum gt 1", - "actual_value" : None, - "expected_value" : None - + "actual_value": None, + "expected_value": None + }, - "result_without_context1": + "result_without_context1": { "product_id": "product_1", "table_name": "test_table", "rule_type": "agg_dq", - "rule":"col1_sum_gt_6_and_lt_10", + "rule": "col1_sum_gt_6_and_lt_10", + "column_name": "col1", "expectation": "sum(col1)>6 and sum(col1)<10", "tag": "validity", "status": "fail", "description": "sum of col1 is greater than 6 and sum of col1 is less than 10", - "actual_value" : 6, - "expected_value" : '6>6 and 6<10' - - } - } - - + "actual_value": 6, + "expected_value": '6>6 and 6<10' + } + } @pytest.fixture(name="_fixture_row_dq_expected_result") @@ -370,13 +383,13 @@ def fixture_agg_dq_expected_result(): "tag": "accuracy", "description": "col2 unique value grater than 3" }, - { - "rule_type": "agg_dq", - "rule": "col1_sum_gt_6_and_lt_10", - "action_if_failed": "fail", - "tag": "accuracy", - "description": "sum of col1 value grater than 6 and less than 10" - } + { + "rule_type": "agg_dq", + "rule": "col1_sum_gt_6_and_lt_10", + "action_if_failed": "fail", + "tag": "accuracy", + "description": "sum of col1 value grater than 6 and less than 10" + } ] } @@ -387,22 +400,22 @@ def fixture_query_dq_expected_result(): return { "result": [ - { - 'rule': 'table_row_count_gt_1', - 'description': 'table count should be greater than 1', - 'rule_type': 'query_dq', - 'tag': 'validity', - 'action_if_failed': 'ignore' - }, - { - 'rule': 'table_distinct_count', - 'description': 'table distinct row count should be greater than 3', - 'rule_type': 'query_dq', - 'tag': 'accuracy', - 'action_if_failed': 'fail' - } - ] - } + { + 'rule': 'table_row_count_gt_1', + 'description': 'table count should be greater than 1', + 'rule_type': 'query_dq', + 'tag': 'validity', + 'action_if_failed': 'ignore' + }, + { + 'rule': 'table_distinct_count', + 'description': 'table distinct row count should be greater than 3', + 'rule_type': 'query_dq', + 'tag': 'accuracy', + 'action_if_failed': 'fail' + } + ] + } import pytest @@ -475,140 +488,146 @@ def compare_result(_actual_output, _expected_output): compare_result(actual_output, expected_output) - - @pytest.mark.parametrize("_query_dq_rule, query_dq_detailed_expected_result, _source_dq_status,_target_dq_status", [ # expectations rule ({ - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_row_count_gt_1", - "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", - "enable_querydq_custom_output": True, - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_target_dq_validation": True, - "enable_for_source_dq_validation": True, - "description": "table count should be greater than 1", - "expectation_source_f1": "select count(*) from query_test_table", - "expectation_target_f1": "select count(*) from query_test_table_target" - }, + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_row_count_gt_1", + "column_name": "col1", + "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", + "enable_querydq_custom_output": True, + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_target_dq_validation": True, + "enable_for_source_dq_validation": True, + "description": "table count should be greater than 1", + "expectation_source_f1": "select count(*) from query_test_table", + "expectation_target_f1": "select count(*) from query_test_table_target" + }, # result in spark col object { - "product_id": "product_1", - "table_name": "test_table", - "rule_type": "query_dq", - "rule": "table_row_count_gt_1", - "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", - "tag": "validity", - "status": "fail", - "description": "table count should be greater than 1", - "actual_value" : 0, - "expected_value" : '>3' - },True,False), + "product_id": "product_1", + "table_name": "test_table", + "rule_type": "query_dq", + "rule": "table_row_count_gt_1", + "column_name": "col1", + "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", + "tag": "validity", + "status": "fail", + "description": "table count should be greater than 1", + "actual_value": 0, + "expected_value": '>3' + }, True, False), # expectations rule ({ - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_distinct_count", - "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", - "enable_querydq_custom_output": False, - "action_if_failed": "fail", - "table_name": "test_table", - "tag": "accuracy", - "enable_for_target_dq_validation": True, - "enable_for_source_dq_validation": True, - "description": "table distinct row count should be greater than 3", - "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)", - "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)" - }, + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_distinct_count", + "column_name": "col1", + "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", + "enable_querydq_custom_output": False, + "action_if_failed": "fail", + "table_name": "test_table", + "tag": "accuracy", + "enable_for_target_dq_validation": True, + "enable_for_source_dq_validation": True, + "description": "table distinct row count should be greater than 3", + "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)", + "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)" + }, # result in spark col object { - "product_id": "product_1", - "table_name": "test_table", - "rule_type": "query_dq", - "rule": "table_distinct_count", - "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", - "tag": "accuracy", - "status": "fail", - "description": "table distinct row count should be greater than 3", - "actual_value" : 0, - "expected_value" : '>3' - },False, True - ), + "product_id": "product_1", + "table_name": "test_table", + "rule_type": "query_dq", + "rule": "table_distinct_count", + "column_name": "col1", + "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", + "tag": "accuracy", + "status": "fail", + "description": "table distinct row count should be greater than 3", + "actual_value": 0, + "expected_value": '>3' + }, False, True + ), ]) def test_agg_query_dq_detailed_result_with_querdq_v2(_fixture_df, - _query_dq_rule, - query_dq_detailed_expected_result, - _fixture_mock_context,_source_dq_status,_target_dq_status): - + _query_dq_rule, + query_dq_detailed_expected_result, + _fixture_mock_context, _source_dq_status, _target_dq_status): _fixture_df.createOrReplaceTempView("query_test_table") _fixture_df.createOrReplaceTempView("query_test_table_target") - result_out,result_output = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _query_dq_rule,_fixture_df,[],_source_dq_status=_source_dq_status,_target_dq_status=_target_dq_status - ) - print("result_df:",result_output) - print("query_dq_detailed_expected_result:",query_dq_detailed_expected_result) - + result_out, result_output = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, + _query_dq_rule, _fixture_df, [], + _source_dq_status=_source_dq_status, + _target_dq_status=_target_dq_status + ) + print("result_df:", result_output) + print("query_dq_detailed_expected_result:", query_dq_detailed_expected_result) assert result_output[1] == query_dq_detailed_expected_result.get("product_id") assert result_output[2] == query_dq_detailed_expected_result.get("table_name") assert result_output[3] == query_dq_detailed_expected_result.get("rule_type") assert result_output[4] == query_dq_detailed_expected_result.get("rule") - assert result_output[5] == query_dq_detailed_expected_result.get("expectation") - assert result_output[6] == query_dq_detailed_expected_result.get("tag") - assert result_output[7] == query_dq_detailed_expected_result.get("description") - assert result_output[8] == query_dq_detailed_expected_result.get("status") - - assert result_output[9] == query_dq_detailed_expected_result.get("actual_value") - assert result_output[10] == query_dq_detailed_expected_result.get("expected_value") + assert result_output[5] == query_dq_detailed_expected_result.get("column_name") + assert result_output[6] == query_dq_detailed_expected_result.get("expectation") + assert result_output[7] == query_dq_detailed_expected_result.get("tag") + assert result_output[8] == query_dq_detailed_expected_result.get("description") + assert result_output[9] == query_dq_detailed_expected_result.get("status") + assert result_output[10] == query_dq_detailed_expected_result.get("actual_value") + assert result_output[11] == query_dq_detailed_expected_result.get("expected_value") @pytest.mark.parametrize("_query_dq_rule_exception", [ # expectations rule ({ - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_row_count_gt_1", - "expectation": "(select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", - "enable_querydq_custom_output": True, - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_target_dq_validation": True, - "description": "table count should be greater than 1", - "expectation_source_f1": "select count(*) from query_test_table", - "expectation_target_f1": "select count(*) from query_test_table_target" + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_row_count_gt_1", + "column_name": "col1", + "expectation": "(select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", + "enable_querydq_custom_output": True, + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_target_dq_validation": True, + "description": "table count should be greater than 1", + "expectation_source_f1": "select count(*) from query_test_table", + "expectation_target_f1": "select count(*) from query_test_table_target" } ), # expectations rule ({ - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_distinct_count", - "expectation": "(select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", - "enable_querydq_custom_output": False, - "action_if_failed": "fail", - "table_name": "test_table", - "tag": "accuracy", - "enable_for_target_dq_validation": True, - "enable_for_source_dq_validation": True, - "description": "table distinct row count should be greater than 3", - "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)", - "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)" + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_distinct_count", + "column_name": "col1", + "expectation": "(select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", + "enable_querydq_custom_output": False, + "action_if_failed": "fail", + "table_name": "test_table", + "tag": "accuracy", + "enable_for_target_dq_validation": True, + "enable_for_source_dq_validation": True, + "description": "table distinct row count should be greater than 3", + "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)", + "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)" } - ), - + ), + ]) def test_agg_query_dq_detailed_result_exception_v2(_fixture_df, - _query_dq_rule_exception,_fixture_mock_context): + _query_dq_rule_exception, _fixture_mock_context): # faulty user input is given to test the exception functionality of the agg_query_dq_detailed_result _fixture_df.createOrReplaceTempView("query_test_table") _fixture_df.createOrReplaceTempView("query_test_table_target") with pytest.raises(SparkExpectationsMiscException, match=r"(error occurred while running agg_query_dq_detailed_result Sql query is invalid. *)|(error occurred while running agg_query_dq_detailed_result Regex match not found. *)"): - SparkExpectationsActions().agg_query_dq_detailed_result(_fixture_mock_context, _query_dq_rule_exception,_fixture_df,[] ) + SparkExpectationsActions().agg_query_dq_detailed_result(_fixture_mock_context, _query_dq_rule_exception, + _fixture_df, []) @pytest.mark.parametrize("input_df, rule_type_name, expected_output", @@ -646,7 +665,8 @@ def test_create_agg_dq_results(input_df, rule_type_name, expected_output, _fixture_mock_context): # unit test case on create_agg_dq_results - assert SparkExpectationsActions().create_agg_dq_results(_fixture_mock_context,input_df, rule_type_name, ) == expected_output + assert SparkExpectationsActions().create_agg_dq_results(_fixture_mock_context, input_df, + rule_type_name, ) == expected_output @pytest.mark.parametrize("input_df", @@ -667,100 +687,107 @@ def test_create_agg_dq_results_exception(input_df, def test_agg_query_dq_detailed_result_exception(_fixture_df, - _fixture_query_dq_rule): + _fixture_query_dq_rule): _mock_object_context = Mock(spec=SparkExpectationsContext) # faulty user input is given to test the exception functionality of the agg_query_dq_detailed_result - + with pytest.raises(SparkExpectationsMiscException, match=r"error occurred while running agg_query_dq_detailed_result .*"): - SparkExpectationsActions().agg_query_dq_detailed_result(_mock_object_context, "_fixture_query_dq_rule","",[] ) + SparkExpectationsActions().agg_query_dq_detailed_result(_mock_object_context, "_fixture_query_dq_rule", "", + []) def test_agg_query_dq_detailed_result(_fixture_df, - _fixture_agg_dq_rule, - _fixture_agg_dq_detailed_expected_result, - _fixture_mock_context): - result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _fixture_agg_dq_rule,_fixture_df,[] - ) - - + _fixture_agg_dq_rule, + _fixture_agg_dq_detailed_expected_result, + _fixture_mock_context): + result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, + _fixture_agg_dq_rule, _fixture_df, [] + ) + assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result").get("product_id") assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result").get("table_name") assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result").get("rule_type") assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result").get("rule") - assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result").get("expectation") - assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result").get("tag") - assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result").get("description") - assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result").get("status") - - assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result").get("actual_value") - assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result").get("expected_value") + assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result").get("column_name") + assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result").get("expectation") + assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result").get("tag") + assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result").get("description") + assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result").get("status") + + assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result").get("actual_value") + assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result").get("expected_value") def test_agg_query_dq_detailed_result_with_range_rule_type(_fixture_df, - _fixture_agg_dq_rule_type_range, - _fixture_agg_dq_detailed_expected_result, - _fixture_mock_context): - result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _fixture_agg_dq_rule_type_range,_fixture_df,[] - ) - + _fixture_agg_dq_rule_type_range, + _fixture_agg_dq_detailed_expected_result, + _fixture_mock_context): + result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, + _fixture_agg_dq_rule_type_range, + _fixture_df, [] + ) + assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("product_id") assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("table_name") assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("rule_type") assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("rule") - assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("expectation") - assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("tag") - assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("description") - assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("status") - - assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("actual_value") - assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("expected_value") + assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("column_name") + assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("expectation") + assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("tag") + assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("description") + assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("status") + + assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("actual_value") + assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get( + "expected_value") def test_agg_query_dq_detailed_result_with_querdq(_fixture_df, - _fixture_query_dq_rule, - _fixture_agg_dq_detailed_expected_result, - _fixture_mock_context): - + _fixture_query_dq_rule, + _fixture_agg_dq_detailed_expected_result, + _fixture_mock_context): _fixture_df.createOrReplaceTempView("query_test_table") _fixture_df.createOrReplaceTempView("query_test_table_target") - result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _fixture_query_dq_rule,_fixture_df,[] - ) - + result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, + _fixture_query_dq_rule, _fixture_df, + [] + ) + assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("product_id") assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("table_name") assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("rule_type") assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("rule") - assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expectation") - assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("tag") - assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("description") - assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("status") - - assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("actual_value") - assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expected_value") + assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("column_name") + assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expectation") + assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("tag") + assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("description") + assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("status") + + assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("actual_value") + assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expected_value") def test_agg_query_dq_detailed_result_without_detailed_context(_fixture_df, - _fixture_agg_dq_rule, - _fixture_agg_dq_detailed_expected_result, - _fixture_mock_context_without_detailed_stats): - result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context_without_detailed_stats, _fixture_agg_dq_rule,_fixture_df,[] - ) - + _fixture_agg_dq_rule, + _fixture_agg_dq_detailed_expected_result, + _fixture_mock_context_without_detailed_stats): + result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result( + _fixture_mock_context_without_detailed_stats, _fixture_agg_dq_rule, _fixture_df, [] + ) - assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("product_id") assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("table_name") assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("rule_type") assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("rule") - assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expectation") - assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("tag") - assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("description") - assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("status") - - assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("actual_value") - assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expected_value") + assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("column_name") + assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expectation") + assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("tag") + assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("description") + assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("status") + assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("actual_value") + assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expected_value") def test_run_dq_rules_row(_fixture_df, @@ -791,14 +818,14 @@ def test_run_dq_rules_row(_fixture_df, (True, False), (False, True), ]) - def test_run_dq_rules_agg(_fixture_df, _fixture_expectations, _fixture_agg_dq_expected_result, - _fixture_mock_context,agg_dq_source_dq_status,agg_dq_target_dq_status): + _fixture_mock_context, agg_dq_source_dq_status, agg_dq_target_dq_status): # Apply the data quality rules - result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations,"agg_dq",agg_dq_source_dq_status,agg_dq_target_dq_status) + result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations, + "agg_dq", agg_dq_source_dq_status, agg_dq_target_dq_status) # Assert that the result dataframe has the expected number of columns assert len(result_df.columns) == 1 @@ -810,8 +837,6 @@ def test_run_dq_rules_agg(_fixture_df, assert row.meta_agg_dq_results == _fixture_agg_dq_expected_result.get("result") - - @pytest.mark.parametrize("query_dq_source_dq_status,query_dq_target_dq_status", [ (True, False), (False, True), @@ -819,12 +844,13 @@ def test_run_dq_rules_agg(_fixture_df, def test_run_dq_rules_query(_fixture_df, _fixture_expectations, _fixture_query_dq_expected_result, - _fixture_mock_context,query_dq_source_dq_status,query_dq_target_dq_status): + _fixture_mock_context, query_dq_source_dq_status, query_dq_target_dq_status): # Apply the data quality rules _fixture_df.createOrReplaceTempView("query_test_table") _fixture_df.createOrReplaceTempView("query_test_table_target") - - result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations,"query_dq",query_dq_source_dq_status,query_dq_target_dq_status) + + result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations, + "query_dq", query_dq_source_dq_status, query_dq_target_dq_status) # Assert that the result dataframe has the expected number of columns assert len(result_df.columns) == 1 @@ -1535,6 +1561,7 @@ def test_run_dq_rules_condition_expression_exception(_fixture_df, { "rule_type": "query_dq", "rule": "table_row_count_gt_1", + "column_name": "col1", "expectation": "(select count(*) from query_test_table)>1", "action_if_failed": "ignore", "table_name": "test_table", @@ -1542,26 +1569,25 @@ def test_run_dq_rules_condition_expression_exception(_fixture_df, "enable_for_target_dq_validation": False, "description": "table count should be greater than 1" }, - - ], - } + + ], + } _fixture_df.createOrReplaceTempView("query_test_table") with pytest.raises(SparkExpectationsMiscException, match=r"error occurred while running expectations .*"): - SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _expectations, - "query_dq", False, True) + "query_dq", False, True) @pytest.mark.parametrize("_rule_test", [ - + ({"rule_type": "query"}), - + ]) def test_run_dq_rules_condition_expression_dynamic_exception(_fixture_df, - _fixture_query_dq_expected_result, - _fixture_mock_context,_rule_test): + _fixture_query_dq_expected_result, + _fixture_mock_context, _rule_test): # Apply the data quality rules _expectations = {"query_rules": [ { @@ -1574,13 +1600,13 @@ def test_run_dq_rules_condition_expression_dynamic_exception(_fixture_df, "enable_for_target_dq_validation": False, "description": "table count should be greater than 1" }, - - ], - } + + ], + } _fixture_df.createOrReplaceTempView("query_test_table") with pytest.raises(SparkExpectationsMiscException, match=r"error occurred while running expectations .*"): - _rule_type= _rule_test.get("rule_type") + _rule_type = _rule_test.get("rule_type") SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _expectations, - _rule_type, False, True) + _rule_type, False, True) From dfb8d16398623344f4ce584014edb10e66499ffc Mon Sep 17 00:00:00 2001 From: Sudeepta pal <111543327+sudeep7978@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:39:03 +0530 Subject: [PATCH 5/9] updated for Schema Evolution with AutoMerge: Enabled Delta Lake's spark.databricks.delta.schema.autoMerge.enabled configuration to allow schema evolution during write operations. Modified the data quality framework to include the affected_column_name field dynamically if not already present. --- spark_expectations/examples/base_setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/spark_expectations/examples/base_setup.py b/spark_expectations/examples/base_setup.py index 3527b0d..33f82e7 100644 --- a/spark_expectations/examples/base_setup.py +++ b/spark_expectations/examples/base_setup.py @@ -134,6 +134,7 @@ def set_up_delta() -> SparkSession: .config("spark.sql.warehouse.dir", "/tmp/hive/warehouse") .config("spark.driver.extraJavaOptions", "-Dderby.system.home=/tmp/derby") .config("spark.jars.ivy", "/tmp/ivy2") + .config("spark.databricks.delta.schema.autoMerge.enabled", "true") ) spark = builder.getOrCreate() From c857252d45f0f1845115de157acec28ee475f270 Mon Sep 17 00:00:00 2001 From: Sudeepta pal <111543327+sudeep7978@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:51:48 +0530 Subject: [PATCH 6/9] Enabled Delta Lake's spark.databricks.delta.schema.autoMerge.enabled configuration to allow schema evolution during write operations. --- docs/delta.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/delta.md b/docs/delta.md index 2db8d45..d68abaa 100644 --- a/docs/delta.md +++ b/docs/delta.md @@ -19,6 +19,7 @@ builder = ( .config("spark.sql.warehouse.dir", "/tmp/hive/warehouse") .config("spark.driver.extraJavaOptions", "-Dderby.system.home=/tmp/derby") .config("spark.jars.ivy", "/tmp/ivy2") + .config("spark.databricks.delta.schema.autoMerge.enabled", "true") ) spark = builder.getOrCreate() ``` From 86e2bece74cd27615e84c00a28856a74b8d8016d Mon Sep 17 00:00:00 2001 From: Sudeepta pal <111543327+sudeep7978@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:56:42 +0530 Subject: [PATCH 7/9] adding additional column in the detailed table. --- docs/getting-started/setup.md | 94 ++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 46 deletions(-) diff --git a/docs/getting-started/setup.md b/docs/getting-started/setup.md index e77435c..df4827f 100644 --- a/docs/getting-started/setup.md +++ b/docs/getting-started/setup.md @@ -156,29 +156,30 @@ product_id string, -- (2)! table_name string, -- (3)! rule_type string, -- (4)! rule string, -- (5)! -source_expectations string, -- (6)! -tag string, -- (7)! -description string, -- (8)! -source_dq_status string, -- (9)! -source_dq_actual_outcome string, -- (10)! -source_dq_expected_outcome string, -- (11)! -source_dq_actual_row_count string, -- (12)! -source_dq_error_row_count string, -- (13)! -source_dq_row_count string, -- (14)! -source_dq_start_time string, -- (15)! -source_dq_end_time string, -- (16)! -target_expectations string, -- (17)! -target_dq_status string, -- (18)! -target_dq_actual_outcome string, -- (19)! -target_dq_expected_outcome string, -- (20)! -target_dq_actual_row_count string, -- (21)! -target_dq_error_row_count string, -- (22)! -target_dq_row_count string, -- (23)! -target_dq_start_time string, -- (24)! -target_dq_end_time string, -- (25)! -dq_date date, -- (26)! -dq_time string, -- (27)! -dq_job_metadata_info string, -- (28)! +column_name, --(6)! +source_expectations string, -- (7)! +tag string, -- (8)! +description string, -- (9)! +source_dq_status string, -- (10)! +source_dq_actual_outcome string, -- (11)! +source_dq_expected_outcome string, -- (12)! +source_dq_actual_row_count string, -- (13)! +source_dq_error_row_count string, -- (14)! +source_dq_row_count string, -- (15)! +source_dq_start_time string, -- (16)! +source_dq_end_time string, -- (17)! +target_expectations string, -- (18)! +target_dq_status string, -- (19)! +target_dq_actual_outcome string, -- (20)! +target_dq_expected_outcome string, -- (21)! +target_dq_actual_row_count string, -- (22)! +target_dq_error_row_count string, -- (23)! +target_dq_row_count string, -- (24)! +target_dq_start_time string, -- (25)! +target_dq_end_time string, -- (26)! +dq_date date, -- (27)! +dq_time string, -- (28)! +dq_job_metadata_info string, -- (29)! ); ``` @@ -187,26 +188,27 @@ dq_job_metadata_info string, -- (28)! 3. `table_name` The target table where the final data gets inserted 4. `rule_type` Either row/query/agg dq 5. `rule` Rule name -6. `source_expectations` Actual Rule to be executed on the source dq -7. `tag` completeness,uniqueness,validity,accuracy,consistency, -8. `description` Description of the Rule -9. `source_dq_status` Status of the rule execution in the Source dq -10. `source_dq_actual_outcome` Actual outcome of the Source dq check -11. `source_dq_expected_outcome` Expected outcome of the Source dq check -12. `source_dq_actual_row_count` Number of rows of the source dq -13. `source_dq_error_row_count` Number of rows failed in the source dq -14. `source_dq_row_count` Number of rows of the source dq -15. `source_dq_start_time` source dq start timestamp -16. `source_dq_end_time` source dq end timestamp -17. `target_expectations` Actual Rule to be executed on the target dq -18. `target_dq_status` Status of the rule execution in the Target dq -19. `target_dq_actual_outcome` Actual outcome of the Target dq check -20. `target_dq_expected_outcome` Expected outcome of the Target dq check -21. `target_dq_actual_row_count` Number of rows of the target dq -22. `target_dq_error_row_count` Number of rows failed in the target dq -23. `target_dq_row_count` Number of rows of the target dq -24. `target_dq_start_time` target dq start timestamp -25. `target_dq_end_time` target dq end timestamp -26. `dq_date` Dq executed date -27. `dq_time` Dq executed timestamp -28. `dq_job_metadata_info` dq job metadata +6. `column_name` column name where the rule got executed +7. `source_expectations` Actual Rule to be executed on the source dq +8. `tag` completeness,uniqueness,validity,accuracy,consistency, +9. `description` Description of the Rule +10. `source_dq_status` Status of the rule execution in the Source dq +11. `source_dq_actual_outcome` Actual outcome of the Source dq check +12. `source_dq_expected_outcome` Expected outcome of the Source dq check +13. `source_dq_actual_row_count` Number of rows of the source dq +14. `source_dq_error_row_count` Number of rows failed in the source dq +15. `source_dq_row_count` Number of rows of the source dq +16. `source_dq_start_time` source dq start timestamp +17. `source_dq_end_time` source dq end timestamp +18. `target_expectations` Actual Rule to be executed on the target dq +19. `target_dq_status` Status of the rule execution in the Target dq +20. `target_dq_actual_outcome` Actual outcome of the Target dq check +21. `target_dq_expected_outcome` Expected outcome of the Target dq check +22. `target_dq_actual_row_count` Number of rows of the target dq +23. `target_dq_error_row_count` Number of rows failed in the target dq +24. `target_dq_row_count` Number of rows of the target dq +25. `target_dq_start_time` target dq start timestamp +26. `target_dq_end_time` target dq end timestamp +27. `dq_date` Dq executed date +28. `dq_time` Dq executed timestamp +29. `dq_job_metadata_info` dq job metadata From f2cf1312302006984af9ae9e3334e4f030e9baf7 Mon Sep 17 00:00:00 2001 From: Sudeepta pal <111543327+sudeep7978@users.noreply.github.com> Date: Thu, 19 Dec 2024 23:29:09 +0530 Subject: [PATCH 8/9] Update CONTRIBUTORS.md --- CONTRIBUTORS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 2f9577a..bebac14 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -14,6 +14,7 @@ Thanks to the contributors who helped on this project apart from the authors * [Vigneshwarr Venkatesan](https://www.linkedin.com/in/vignesh15) * [Nishant Singh](https://www.linkedin.com/in/singh-nishant/) * [Amaldev Kunnel](https://www.linkedin.com/in/amaldev-k-40222680) +* [Raghavendra H S](https://www.linkedin.com/in/raghavendra-h-s-01786332/) * [Sudeepta pal](https://www.linkedin.com/in/sudeepta-pal-98b393217/) * [Mallikarjunudu Tirumalasetti](https://www.linkedin.com/in/mtirumal/) * [Tadakala sai vamsi goud](https://www.linkedin.com/in/sai-vamsi-goud-455737169/) From ca7ca7f2c8eab523db1fa12d2b4f785696a81456 Mon Sep 17 00:00:00 2001 From: spal40 Date: Mon, 20 Jan 2025 09:07:01 +0530 Subject: [PATCH 9/9] changes --- spark_expectations/config/user_config.py | 3 +- spark_expectations/core/context.py | 44 +++- spark_expectations/core/expectations.py | 7 +- .../examples/sample_dq_delta.py | 26 +- spark_expectations/sinks/utils/writer.py | 16 ++ spark_expectations/utils/alert.py | 117 +++++++++ spark_expectations/utils/report.py | 90 +++++++ .../advanced_email_alert_template.jinja | 228 ++++++++++++++++++ 8 files changed, 518 insertions(+), 13 deletions(-) create mode 100644 spark_expectations/utils/alert.py create mode 100644 spark_expectations/utils/report.py create mode 100644 spark_expectations/utils/templates/advanced_email_alert_template.jinja diff --git a/spark_expectations/config/user_config.py b/spark_expectations/config/user_config.py index ada3700..cc873eb 100644 --- a/spark_expectations/config/user_config.py +++ b/spark_expectations/config/user_config.py @@ -5,6 +5,7 @@ class Constants: # declare const user config variables for email notification se_notifications_enable_email = "spark.expectations.notifications.email.enabled" + se_enable_observability = "spark.expectations.observability.enabled" se_notifications_enable_custom_email_body = ( "spark.expectations.notifications.enable.custom.email.body" ) @@ -16,7 +17,7 @@ class Constants: ) se_notifications_email_from = "spark.expectations.notifications.email.from" se_notifications_email_to_other_mail_id = ( - "spark.expectations.notifications.email.to.other.mail.com" + "sudeepta.pal@nike.com" ) se_notifications_email_subject = "spark.expectations.notifications.email.subject" se_notifications_email_custom_body = ( diff --git a/spark_expectations/core/context.py b/spark_expectations/core/context.py index 6c35155..e0ae1af 100644 --- a/spark_expectations/core/context.py +++ b/spark_expectations/core/context.py @@ -25,6 +25,7 @@ def __post_init__(self) -> None: self._run_id: str = f"{self.product_id}_{uuid1()}" self._run_date: str = self.set_run_date() self._dq_stats_table_name: Optional[str] = None + self._dq_stats_report_table_name: Optional[str] = None self._dq_detailed_stats_table_name: Optional[str] = None self._final_table_name: Optional[str] = None self._error_table_name: Optional[str] = None @@ -142,6 +143,7 @@ def __post_init__(self) -> None: self._target_and_error_table_writer_config: dict = {} self._stats_table_writer_config: dict = {} + self._report_table_config: dict = {} # The below config is user config and will be enabled if detailed result is required for agg and query dq self._enable_agg_dq_detailed_result: bool = False @@ -196,6 +198,25 @@ def get_dq_stats_table_name(self) -> str: """The spark expectations context is not set completely, please assign '_dq_stats_table_name' before accessing it""" ) + def set_dq_stats_report_table_name(self,dq_stats_report_table_name: str) -> None: + self._dq_stats_report_table_name = dq_stats_report_table_name + + + + @property + def get_dq_stats_report_table_name(self) -> str: + """ + Get dq_stats_table_name to which the final stats of the dq job will be written into + + Returns: + str: returns the dq_stats_table_name + """ + if self._dq_stats_report_table_name: + return self._dq_stats_report_table_name + raise SparkExpectationsMiscException( + """The spark expectations context is not set completely, please assign '_dq_stats_report_table_name' before + accessing it""" + ) def set_dq_expectations(self, dq_expectations: dict) -> None: self._dq_expectations = dq_expectations @@ -1844,7 +1865,7 @@ def set_dq_detailed_stats_table_name( self._dq_detailed_stats_table_name = dq_detailed_stats_table_name @property - def get_dq_detailed_stats_table_name(self) -> str: + def get_dq_detailed_stats_table_name(self) -> str: """ Get dq_stats_table_name to which the final stats of the dq job will be written into @@ -1885,6 +1906,23 @@ def get_query_dq_output_custom_table_name(self) -> str: '_dq_detailed_stats_table_name,query_dq_detailed_stats_status' before accessing it""" ) + def set_report_table_config(self, config: dict) -> None: + """ + This function sets report table config + Args: + config: dict + Returns: None + """ + self._report_table_config = config + + @property + def get_report_table_config(self) -> dict: + """ + This function returns report table config + Returns: + dict: Returns report_table_config which in dict + """ + return self._report_table_config def set_detailed_stats_table_writer_config(self, config: dict) -> None: """ @@ -2062,3 +2100,7 @@ def get_stats_dict(self) -> Optional[List[Dict[str, Any]]]: Optional[List[Dict[str, Any]]]: Returns the stats_dict if it exists, otherwise None """ return self._stats_dict if hasattr(self, "_stats_dict") else None + + @property + def report_table_config(self): + return self._report_table_config diff --git a/spark_expectations/core/expectations.py b/spark_expectations/core/expectations.py index 1ec3220..a190b6a 100644 --- a/spark_expectations/core/expectations.py +++ b/spark_expectations/core/expectations.py @@ -343,7 +343,12 @@ def _except(func: Any) -> Any: else False ) - _job_metadata: str = user_config.se_job_metadata + # _job_metadata: str = user_config.se_job_metadata + _job_metadata: str = ( + str(_notification_dict[user_config.se_job_metadata]) + if isinstance(_notification_dict[user_config.se_job_metadata], str) + else None + ) notifications_on_error_drop_threshold = _notification_dict.get( user_config.se_notifications_on_error_drop_threshold, 100 diff --git a/spark_expectations/examples/sample_dq_delta.py b/spark_expectations/examples/sample_dq_delta.py index 1bad471..78d627c 100644 --- a/spark_expectations/examples/sample_dq_delta.py +++ b/spark_expectations/examples/sample_dq_delta.py @@ -15,8 +15,9 @@ spark = set_up_delta() dic_job_info = { - "job": "job_name", + "job": "na_CORL_DIGITAL_source_to_o9", "Region": "NA", + "env": "dev", "Snapshot": "2024-04-15", } job_info = str(dic_job_info) @@ -32,12 +33,13 @@ ) user_conf = { - user_config.se_notifications_enable_email: True, - user_config.se_notifications_enable_custom_email_body: True, - user_config.se_notifications_email_smtp_host: "mailhost.com", - user_config.se_notifications_email_smtp_port: 25, - user_config.se_notifications_email_from: "", - user_config.se_notifications_email_to_other_mail_id: "", + user_config.se_enable_observability:"Enable", + user_config.se_notifications_enable_email: False, + user_config.se_notifications_enable_custom_email_body: False, + user_config.se_notifications_email_smtp_host: "smtp.office365.com", + user_config.se_notifications_email_smtp_port: 587, + user_config.se_notifications_email_from: "sudeepta.pal@nike.com", + user_config.se_notifications_email_to_other_mail_id: "sudeepta.pal@nike.com", user_config.se_notifications_email_subject: "spark expectations - data quality - notifications", user_config.se_notifications_email_custom_body: """Spark Expectations Statistics for this dq run: 'product_id': {}, @@ -116,12 +118,16 @@ def build_new() -> DataFrame: spark.sql("select * from dq_spark_dev.dq_stats").show(truncate=False) spark.sql("select * from dq_spark_dev.dq_stats_detailed").show(truncate=False) spark.sql("select * from dq_spark_dev.dq_stats_querydq_output").show(truncate=False) - spark.sql("select * from dq_spark_dev.dq_stats").printSchema() - spark.sql("select * from dq_spark_dev.dq_stats_detailed").printSchema() - spark.sql("select * from dq_spark_dev.customer_order").show(truncate=False) + _log.info("BELOW IS THE REPORT TABLE") + + spark.sql("select * from dq_spark_dev.dq_obs_report_data").show(truncate=False) + # spark.sql("select count(*) from dq_spark_local.customer_order_error ").show( # truncate=False # ) + if user_config.se_enable_observability == "spark.expectations.observability.enabled": + _log.info("alert_send_successfully") + _log.info("stats data in the kafka topic") # display posted statistics from the kafka topic diff --git a/spark_expectations/sinks/utils/writer.py b/spark_expectations/sinks/utils/writer.py index 7013364..9203966 100644 --- a/spark_expectations/sinks/utils/writer.py +++ b/spark_expectations/sinks/utils/writer.py @@ -23,6 +23,7 @@ SparkExpectationsMiscException, ) from spark_expectations.secrets import SparkExpectationsSecretsBackend +from spark_expectations.utils.alert import AlertTrial from spark_expectations.utils.udf import remove_empty_maps from spark_expectations.core.context import SparkExpectationsContext from spark_expectations.sinks import _sink_hook @@ -592,6 +593,21 @@ def write_detailed_stats(self) -> None: raise SparkExpectationsMiscException( f"error occurred while saving the data into the stats table {e}" ) + print("------------------------------------------########################################################################spark expectation ending here.#####################------------------------------------------") + print(user_config.se_enable_observability) + # Call the dq_obs_report_data_insert method from report.py + if user_config.se_enable_observability=="spark.expectations.observability.enabled": + from spark_expectations.utils.report import SparkExpectationsReport + context = self._context + # report = SparkExpectationsReport(_context=context) + # report.dq_obs_report_data_insert(_df_detailed_stats,_df_custom_detailed_stats_source) + from spark_expectations.utils.alert import AlertTrial + alert = AlertTrial(self._context) + alert.get_report_data(_df_detailed_stats, _df_custom_detailed_stats_source) + + + + def write_error_stats(self) -> None: """ diff --git a/spark_expectations/utils/alert.py b/spark_expectations/utils/alert.py new file mode 100644 index 0000000..230fad8 --- /dev/null +++ b/spark_expectations/utils/alert.py @@ -0,0 +1,117 @@ +import os +from dataclasses import dataclass +from typing import Dict, Tuple +from pyspark.sql import SparkSession, DataFrame +import smtplib +import traceback +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from os import getenv +from jinja2 import Environment, FileSystemLoader +import re + +from pyspark.sql.types import StructType, StructField, StringType + +from spark_expectations.core.context import SparkExpectationsContext + +@dataclass +class AlertTrial: + """ + This class implements the alert trial functionality. + """ + _context: SparkExpectationsContext + + def __post_init__(self) -> None: + self.spark = self._context.spark + + def send_mail(self, body: str, subject: str, receivers_list: str) -> None: + """ + This function is to send the DQ report to the users. + + Args: + body: Email body. + subject: Email subject. + receivers_list: List of email receivers. + """ + try: + cerberus_url = 'https://prod.cerberus.nikecloud.com/' + cerberus_sdb_path = "app/if-common/smtp" + smtp_details = {'a.dsm.pss.obs': 'wp=Wq$37#UI?Ijy7_HNU', 'a.e2e.map.smtp': 'wp=Wq$37#UI?Ijy7_HNU'} + + SMTP_USER_NAME = list(smtp_details.keys())[0] + service_account_email = f"{SMTP_USER_NAME}@nike.com" + service_account_password = smtp_details.get(SMTP_USER_NAME) + body = MIMEText(body, 'html') + msg = MIMEMultipart() + msg.attach(body) + msg['Subject'] = subject + msg['From'] = service_account_email + msg['To'] = receivers_list + + smtp_host = getenv('SMTP_HOST') or "smtp.office365.com" + smtp_port = getenv('SMTP_PORT') or 587 + + with smtplib.SMTP(smtp_host, port=smtp_port) as smtp_server: + smtp_server.ehlo() + smtp_server.starttls() + smtp_server.login(service_account_email, service_account_password) + smtp_server.sendmail(msg['From'], receivers_list.split(','), msg.as_string()) + print("Report sent successfully!") + except Exception as e: + print(f"Error in send_mail: {e}") + traceback.print_exc() + + def get_report_data(self, df_detailed: DataFrame, df_query_output: DataFrame) -> None: + """ + This function calls the dq_obs_report_data_insert method from SparkExpectationsReport. + + Args: + df_detailed: Detailed DataFrame. + df_query_output: Query output DataFrame. + """ + try: + from spark_expectations.utils.report import SparkExpectationsReport + + report = SparkExpectationsReport(self._context) + df = report.dq_obs_report_data_insert(df_detailed, df_query_output) + df.write.mode("overwrite").saveAsTable("dq_obs_report_data") + + print("success lets redesign the report") + df.show() + template_dir = os.path.join(os.path.dirname(__file__), 'templates') + env_loader = Environment(loader=FileSystemLoader(template_dir)) + template = env_loader.get_template('advanced_email_alert_template.jinja') + df_data = [row.asDict() for row in df.collect()] + headers = list(df.columns) + rows = [row.asDict().values() for row in df.collect()] + print("df_data") + print(df_data) + + html_output = template.render( + title='central_repo_test_table', + columns=headers, + table_rows=rows, + product_id='12345', + data_object_name='Sample Data Object', + snapshot_date='2023-10-01', + region_code='US', + dag_name='Sample DAG', + run_id='run_12345', + overall_status='Pass', + overall_status_bgcolor='#00FF00', + total_rules_executed=10, + total_passed_rules=9, + total_failed_rules=1, + competency_metrics_slack=[], + competency_metrics=[], + criticality_metrics=[] + ) + print("calling the send mail to the users") + # self.send_mail(html_output, "test", "aaaalfyofqi7i7nxuvxlboxbym@nike.org.slack.com") + + print("print the html data") + print(html_output) + + except Exception as e: + print(f"Error in get_report_data: {e}") + traceback.print_exc() \ No newline at end of file diff --git a/spark_expectations/utils/report.py b/spark_expectations/utils/report.py new file mode 100644 index 0000000..fcc71c4 --- /dev/null +++ b/spark_expectations/utils/report.py @@ -0,0 +1,90 @@ + +from dataclasses import dataclass +from typing import Dict, Optional, Tuple, List +from datetime import datetime, timezone +from pyspark.sql import DataFrame +from pyspark.sql.functions import ( + lit, + expr, + when, + array, + to_timestamp, + round as sql_round, + create_map, + explode, + to_json, + col, + split, + current_date, + get_json_object +) +from pyspark.sql.types import StructType +from spark_expectations import _log +from spark_expectations.core.exceptions import ( + SparkExpectationsUserInputOrConfigInvalidException, + SparkExpectationsMiscException, +) +from spark_expectations.secrets import SparkExpectationsSecretsBackend +from spark_expectations.utils.udf import remove_empty_maps +from spark_expectations.core.context import SparkExpectationsContext +from spark_expectations.sinks import _sink_hook +from spark_expectations.config.user_config import Constants as user_config + +@dataclass +class SparkExpectationsReport: + """ + This class implements/supports writing data into the sink system + """ + + _context: SparkExpectationsContext + + def __post_init__(self) -> None: + self.spark = self._context.spark + + def dq_obs_report_data_insert(self, df_detailed: DataFrame, df_query_output: DataFrame): + try: + print("dq_obs_report_data_insert method called stats_detailed table") + # df_detailed.show(truncate=False) + df = df_detailed + # List of columns to be removed + columns_to_remove = [ + "target_dq_status", + "source_expectations", + "source_dq_actual_outcome", + "source_dq_expected_outcome", + "source_dq_start_time", + "source_dq_end_time", + "target_expectations", + "target_dq_actual_outcome", + "target_dq_expected_outcome", + "target_dq_actual_row_count", + "target_dq_error_row_count", + "target_dq_row_count", + "target_dq_start_time", + "target_dq_end_time", + "dq_job_metadata_info" + ] + # Rename the columns + df = df.withColumnRenamed("source_dq_row_count", "total_records") \ + .withColumnRenamed("source_dq_error_row_count", "failed_records") \ + .withColumnRenamed("source_dq_actual_row_count", "valid_records") + df = df.withColumn("dag_name", get_json_object(col("dq_job_metadata_info"), "$.job")) \ + .withColumn("Region_cd", get_json_object(col("dq_job_metadata_info"), "$.Region")) \ + .withColumn("env", get_json_object(col("dq_job_metadata_info"), "$.env")) + + # Calculate the success percentage and add it as a new column + df = df.withColumn("success_percentage", (col("valid_records") / col("total_records")) * 100) + + # Create a new DataFrame by dropping the specified columns + print("This is the table ") + new_df = df.drop(*columns_to_remove) + # Save the DataFrame to a table + new_df.write.mode("overwrite").saveAsTable("dq_obs_report_data") + return new_df + except Exception as e: + raise SparkExpectationsMiscException( + f"An error occurred in dq_obs_report_data_insert: {e}" + ) + + + diff --git a/spark_expectations/utils/templates/advanced_email_alert_template.jinja b/spark_expectations/utils/templates/advanced_email_alert_template.jinja new file mode 100644 index 0000000..dfc2258 --- /dev/null +++ b/spark_expectations/utils/templates/advanced_email_alert_template.jinja @@ -0,0 +1,228 @@ + + + + + + + {{ title }} + + + + + +
+

{{ title }}

+
+ {%if communication_engine == 'outlook' %} + + {% else %} +
+ {% endif %} + + + {% if (communication_engine == 'outlook') %} + + {% endif %} + + + +
+

+ {% if persona == 'engineer' %} + Summary of Data quality execution for the product id : {{product_id}}
+ {%else%} + Summary of Data quality execution for the data product {{data_object_name}}
+ {%endif%} + Snapshot Date : {{ snapshot_date }}
+ Region Code : {{ region_code }} + {% if persona == 'engineer' %} +
DAG Name : {{ dag_name }} +
run_id : {{ run_id }} + {% endif %} +

+
+

Overall Status

+ Progress Chart +
DQ Rules(Pass/Total) - {{ total_passed_rules }} / {{ total_rules_executed }}
+ {% else %} +
+

Overall Status

+

{{overall_status}}

+

Total DQ Rules : {{ total_rules_executed }}

+

Passed : {{ total_passed_rules }} / Failed : {{ total_failed_rules }}

+
+ {% if persona == 'engineer' %} + + {% if communication_engine == 'outlook' %} + + {% else %} +
+ {% endif %} + + + + + + + +
+

Summary of Data Quality Competency

+ + {% if communication_engine == 'outlook' %} + + {% for metric in competency_metrics %} + + {% endfor %} + + {% else %} + + {% for criticality in competency_metrics_slack %} + + {% endfor %} + + {% endif %} +
+ Progress Chart +

{{ metric.name }}
{{ metric.description }}

+
+ {{ criticality.value }}
{{ criticality.label }} +
+
+

Summary of TC Criticality

+ + + {% for criticality in criticality_metrics|sort(attribute='sort_number') %} + + {% endfor %} + +
+ {{ criticality.value }}
Criticality {{ criticality.label }} +
+
+ {% endif %} + + + {% if communication_engine == 'outlook' %} +
+ {% else %} +
+ {% endif %} + + + {% for column in columns %} + + {% endfor %} + + + + {% for row in table_rows %} + + {% for cell in row %} + {% if cell == 'fail' or cell == 'FAIL' %} + + {% else %} + + {% endif %} + {% endfor %} + + {% endfor %} + +
{{ column.replace('-', ' ').replace('_', ' ').title() }}
{{ cell }}{{ cell }}
+ + +