Skip to content

Commit

Permalink
Updated Referential Integrity to support multiple columns (#463)
Browse files Browse the repository at this point in the history
- The change involves updating the API to accept a list of columns for both the primary and reference dataframes.
- This is still an experimental utility, hence we are changing the API.
- Added tests to verify the updated behavior.
  • Loading branch information
rdsharma26 committed Apr 16, 2024
1 parent dc64d49 commit 1ce5b3e
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,57 @@
package com.amazon.deequ.comparison

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

import scala.util.Try

object ReferentialIntegrity {

/**
* Checks to what extent a column from a DataFrame is a subset of another column
* Checks to what extent a set of columns from a DataFrame is a subset of another set of columns
* from another DataFrame.
*
* This is an experimental utility.
*
* @param primary The primary data set which contains the column which the customer
* will select the column to do the Referential Integrity check.
* @param primaryCol The name of the column selected from the primary data set.
* @param reference The reference data set which contains the possible values for the column
* from the primary dataset.
* @param referenceCol The name of the column selected from the reference data set, which
* contains those values.
* @param assertion A function which accepts the match ratio and returns a Boolean.
* @param primary The primary data set which contains the columns which the customer
* will select to do the Referential Integrity check.
* @param primaryColumns The names of the columns selected from the primary data set.
* @param reference The reference data set which contains the possible values for the columns
* from the primary dataset.
* @param referenceColumns The names of the columns selected from the reference data set, which
* contains those values.
* @param assertion A function which accepts the match ratio and returns a Boolean.
*
* @return Boolean Internally we calculate the referential integrity as a
* ratio, and we run the assertion on that outcome
* that ends up being a true or false response.
*/

def subsetCheck(primary: DataFrame,
primaryCol: String,
primaryColumns: Seq[String],
reference: DataFrame,
referenceCol: String,
referenceColumns: Seq[String],
assertion: Double => Boolean): ComparisonResult = {
val primaryCount = primary.count()
val primaryColumnsNotInDataset = primaryColumns.filterNot(c => Try(primary(c)).isSuccess)
val referenceColumnsNotInDataset = referenceColumns.filterNot(c => Try(reference(c)).isSuccess)

if (!primary.columns.contains(primaryCol)) {
ComparisonFailed(s"Column $primaryCol does not exist in primary data frame.")
} else if (!reference.columns.contains(referenceCol)) {
ComparisonFailed(s"Column $referenceCol does not exist in reference data frame.")
} else if (primaryCount == 0) {
if (primaryColumnsNotInDataset.nonEmpty) {
primaryColumnsNotInDataset match {
case Seq(c) => ComparisonFailed(s"Column $c does not exist in primary data frame.")
case cols => ComparisonFailed(s"Columns ${cols.mkString(", ")} do not exist in primary data frame.")
}
} else if (referenceColumnsNotInDataset.nonEmpty) {
referenceColumnsNotInDataset match {
case Seq(c) => ComparisonFailed(s"Column $c does not exist in reference data frame.")
case cols => ComparisonFailed(s"Columns ${cols.mkString(", ")} do not exist in reference data frame.")
}
} else if (primary.head(1).isEmpty) {
ComparisonFailed(s"Primary data frame contains no data.")
} else {
val primarySparkCol = primary.select(primaryCol)
val referenceSparkCol = reference.select(referenceCol)
val mismatchCount = primarySparkCol.except(referenceSparkCol).count()
val primaryCount = primary.count()
val primarySparkCols = primary.select(primaryColumns.map(col): _*)
val referenceSparkCols = reference.select(referenceColumns.map(col): _*)
val mismatchCount = primarySparkCols.except(referenceSparkCols).count()

val ratio = if (mismatchCount == 0) 1.0 else (primaryCount - mismatchCount).toDouble / primaryCount

Expand Down
Loading

0 comments on commit 1ce5b3e

Please sign in to comment.