Skip to content

Commit

Permalink
Merge branch 'master' into fergonp-dataset-comparison
Browse files Browse the repository at this point in the history
  • Loading branch information
rdsharma26 authored Feb 8, 2023
2 parents 0580d57 + ef4c308 commit e05caaa
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 30 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Java CI with Maven

on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'corretto'
cache: maven
- name: Build with Maven
run: mvn clean test

7 changes: 4 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
.idea/
*.iml
**/*.iml
target/.travis/public-signing-key.gpg
target/
.DS_Store

.metals/
.vscode/
.bloop/
.DS_Store
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>2.0.0-spark-3.2</version>
<version>2.0.2-spark-3.3</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
Expand All @@ -18,7 +18,7 @@
<artifact.scala.version>${scala.major.version}</artifact.scala.version>
<scala-maven-plugin.version>4.4.0</scala-maven-plugin.version>

<spark.version>3.2.1</spark.version>
<spark.version>3.3.0</spark.version>
</properties>

<name>deequ</name>
Expand Down Expand Up @@ -94,6 +94,12 @@
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.major.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.major.version}</artifactId>
Expand Down
263 changes: 238 additions & 25 deletions src/main/scala/com/amazon/deequ/analyzers/Distance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,47 @@
*/

package com.amazon.deequ.analyzers
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.stat.Statistics._
import org.apache.spark.mllib.stat.test.ChiSqTestResult





object Distance {

/** Calculate distance of numerical profiles based on KLL Sketches and L-Infinity Distance */
// Chi-square constants
// at least two distinct categories are required to run the chi-square test for a categorical variable
private val chisquareMinDimension: Int = 2

//for tables larger than 2 x 2: "No more than 20% of the expected counts are less than 5 and all individual expected counts are 1 or greater" (Yates, Moore & McCabe, 1999, The Practice of Statistics, p. 734)
private val defaultAbsThresholdYates: Integer = 5
private val defaultPercThresholdYates: Double = 0.2

// for 2x2 tables: all expected counts should be 10 or greater (Cochran, William G. "The χ2 test of goodness of fit." The Annals of mathematical statistics (1952): 315-345.)
private val defaultAbsThresholdCochran: Integer = 10

// Default c(alpha) value corresponding to an alpha value of 0.003, Eq. (15) in Section 3.3.1 of Knuth, D.E., The Art of Computer Programming, Volume 2 (Seminumerical Algorithms), 3rd Edition, Addison Wesley, Reading Mass, 1998.
private val defaultCAlpha : Double = 1.8

trait CategoricalDistanceMethod
case class LInfinityMethod(alpha: Option[Double] = None) extends CategoricalDistanceMethod
case class ChisquareMethod(
absThresholdYates: Integer = defaultAbsThresholdYates,
percThresholdYates: Double = defaultPercThresholdYates,
absThresholdCochran: Integer = defaultAbsThresholdCochran)
extends CategoricalDistanceMethod

/** Calculate distance of numerical profiles based on KLL Sketches and L-Infinity Distance */
def numericalDistance(
sample1: QuantileNonSample[Double],
sample2: QuantileNonSample[Double],
correctForLowNumberOfSamples: Boolean = false)
correctForLowNumberOfSamples: Boolean = false,
alpha: Option[Double] = None)
: Double = {
val rankMap1 = sample1.getRankMap()
val rankMap2 = sample2.getRankMap()
Expand All @@ -37,50 +70,230 @@ object Distance {
val cdfDiff = Math.abs(cdf1 - cdf2)
linfSimple = Math.max(linfSimple, cdfDiff)
}
selectMetrics(linfSimple, n, m, correctForLowNumberOfSamples)
selectMetrics(linfSimple, n, m, correctForLowNumberOfSamples, alpha)
}

/** Calculate distance of categorical profiles based on L-Infinity Distance */
def categoricalDistance(
sample1: scala.collection.mutable.Map[String, Long],
sample2: scala.collection.mutable.Map[String, Long],
correctForLowNumberOfSamples: Boolean = false)
: Double = {
/** Calculate distance of categorical profiles based on different distance methods
*
* Thresholds for chi-square method:
* - for 2x2 tables: all expected counts should be 10 or greater (Cochran, William G. "The χ2 test of goodness of fit." The Annals of mathematical statistics (1952): 315-345.)
* - for tables larger than 2 x 2: "No more than 20% of the expected counts are less than 5 and all individual expected counts are 1 or greater" (Yates, Moore & McCabe, 1999, The Practice of Statistics, p. 734)
*
* @param sample1 the mapping between categories(keys) and counts(values) of the observed sample
* @param sample2 the mapping between categories(keys) and counts(values) of the expected baseline
* @param correctForLowNumberOfSamples if true returns chi-square statistics otherwise p-value
* @param method Method to use: LInfinity or Chisquare
* @param absThresholdYates Yates absolute threshold for tables larger than 2x2
* @param percThresholdYates Yates percentage of categories that can be below threshold for tables larger than 2x2
* @param absThresholdCochran Cochran absolute threshold for 2x2 tables
* @return distance can be an absolute distance or a p-value based on the correctForLowNumberOfSamples argument
*/
def categoricalDistance(
sample1: scala.collection.mutable.Map[String, Long],
sample2: scala.collection.mutable.Map[String, Long],
correctForLowNumberOfSamples: Boolean = false,
method: CategoricalDistanceMethod = LInfinityMethod())
: Double = {
method match {
case LInfinityMethod(alpha) => categoricalLInfinityDistance(sample1, sample2, correctForLowNumberOfSamples, alpha)
case ChisquareMethod(absThresholdYates, percThresholdYates, absThresholdCochran)
=> categoricalChiSquareTest(
sample1,
sample2,
correctForLowNumberOfSamples,
absThresholdYates,
percThresholdYates,
absThresholdCochran )
}
}

/** Calculate distance of categorical profiles based on Chisquare test or stats
*
* for 2x2 tables: all expected counts should be 10 or greater (Cochran, William G. "The χ2 test of goodness of fit." The Annals of mathematical statistics (1952): 315-345.)
* for tables larger than 2 x 2: "No more than 20% of the expected counts are less than 5 and all individual expected counts are 1 or greater" (Yates, Moore & McCabe, 1999, The Practice of Statistics, p. 734)
*
* @param sample the mapping between categories(keys) and counts(values) of the observed sample
* @param expected the mapping between categories(keys) and counts(values) of the expected baseline
* @param correctForLowNumberOfSamples if true returns chi-square statistics otherwise p-value
* @param absThresholdYates Yates absolute threshold for tables larger than 2x2
* @param percThresholdYates Yates percentage of categories that can be below threshold for tables larger than 2x2
* @param absThresholdCochran Cochran absolute threshold for 2x2 tables
* @return distance can be an absolute distance or a p-value based on the correctForLowNumberOfSamples argument
*
*/
private[this] def categoricalChiSquareTest(
sample: scala.collection.mutable.Map[String, Long],
expected: scala.collection.mutable.Map[String, Long],
correctForLowNumberOfSamples: Boolean = false,
absThresholdYates : Integer = defaultAbsThresholdYates ,
percThresholdYates : Double = defaultPercThresholdYates,
absThresholdCochran : Integer = defaultAbsThresholdCochran,
normalizeExpected : Boolean = true)
: Double = {

val sampleSum: Double = sample.filter(e => expected.contains(e._1)).map((e => e._2)).sum
val expectedSum: Double = expected.map(e => e._2).sum

// Normalize the expected input, normalization is required to conduct the chi-square test
// While normalization is already included in the mllib chi-square test, we perform normalization manually to execute proper regrouping
// https://spark.apache.org/docs/3.1.3/api/scala/org/apache/spark/mllib/stat/Statistics$.html#chiSqTest:org.apache.spark.mllib.stat.test.ChiSqTestResult
val expectedNorm: scala.collection.mutable.Map[String, Double] = expected.map(e => (e._1, (e._2 / expectedSum * sampleSum)))

// Call the function that regroups categories if necessary depending on thresholds
val (regroupedSample, regroupedExpected) = regroupCategories(sample.map(e => (e._1, e._2.toDouble)), expectedNorm, absThresholdYates, percThresholdYates, absThresholdCochran)

var n = 0.0
var m = 0.0
sample1.keySet.foreach { key =>
n += sample1(key)
// If less than 2 categories remain we cannot conduct the test
if (regroupedSample.keySet.size < chisquareMinDimension) {
Double.NaN
} else {
// run chi-square test and return statistics or p-value
val result = chiSquareTest(regroupedSample, regroupedExpected)
if (correctForLowNumberOfSamples) {
result.statistic
} else {
result.pValue
}
sample2.keySet.foreach { key =>
m += sample2(key)
}
}

/** Regroup categories with elements below threshold, required for chi-square test
*
* for 2x2 tables: all expected counts should be 10 or greater (Cochran, William G. "The χ2 test of goodness of fit." The Annals of mathematical statistics (1952): 315-345.)
* for tables larger than 2 x 2: "No more than 20% of the expected counts are less than 5 and all individual expected counts are 1 or greater" (Yates, Moore & McCabe, 1999, The Practice of Statistics, p. 734)
*
* @param sample the mapping between categories(keys) and counts(values) of the observed sample
* @param expected the mapping between categories(keys) and counts(values) of the expected baseline
* @param absThresholdYates Yates absolute threshold for tables larger than 2x2
* @param percThresholdYates Yates percentage of categories that can be below threshold for tables larger than 2x2
* @param absThresholdCochran Cochran absolute threshold for 2x2 tables
* @return (sample, expected) returns the two regrouped mappings
*
*/
private[this] def regroupCategories(
sample: scala.collection.mutable.Map[String, Double],
expected: scala.collection.mutable.Map[String, Double],
absThresholdYates: Integer = defaultAbsThresholdYates,
percThresholdYates: Double = defaultPercThresholdYates,
absThresholdCochran: Integer = defaultAbsThresholdCochran)
: (scala.collection.mutable.Map[String, Double], scala.collection.mutable.Map[String, Double]) = {

// If number of categories is below the minimum return original mappings
if (expected.keySet.size < chisquareMinDimension) {
(sample, expected)
} else {
// Determine thresholds depending on dimensions of mapping (2x2 tables use Cochran, all other tables Yates thresholds)
var absThresholdPerColumn : Integer = absThresholdCochran
var maxNbColumnsBelowThreshold: Integer = 0
if (expected.keySet.size > chisquareMinDimension) {
absThresholdPerColumn = absThresholdYates
maxNbColumnsBelowThreshold = (percThresholdYates * expected.keySet.size).toInt
}
val combinedKeys = sample1.keySet.union(sample2.keySet)
var linfSimple = 0.0
// Count number of categories below threshold
val nbExpectedColumnsBelowThreshold = expected.filter(e => e._2 < absThresholdPerColumn).keySet.size

combinedKeys.foreach { key =>
val cdf1 = sample1.getOrElse(key, 0L) / n
val cdf2 = sample2.getOrElse(key, 0L) / m
val cdfDiff = Math.abs(cdf1 - cdf2)
linfSimple = Math.max(linfSimple, cdfDiff)
// If the number of categories below threshold exceeds the authorized maximum, small categories are regrouped until valid
if (nbExpectedColumnsBelowThreshold > maxNbColumnsBelowThreshold){

// Identified key that holds minimum value
val expectedMin: (String, Double) = expected.minBy(e => e._2)
val sampleMinValue : Double = sample.getOrElse(expectedMin._1, 0)

// Remove smallest category
expected.remove(expectedMin._1)
sample.remove(expectedMin._1)

// Add value of smallest category to second smallest category
val expectedSecondMin = expected.minBy(e => e._2)
val sampleSecondMinValue : Double = sample.getOrElse(expectedSecondMin._1, 0)

expected.update(expectedSecondMin._1, expectedSecondMin._2 + expectedMin._2 )
sample.update(expectedSecondMin._1, sampleMinValue + sampleSecondMinValue )

// Recursively call function until mappings are valid
regroupCategories(sample, expected, absThresholdYates, percThresholdYates, absThresholdCochran)
} else {
// In case the mappings are valid the original mappings are returned
(sample, expected)
}
selectMetrics(linfSimple, n, m, correctForLowNumberOfSamples)
}
}


/** Runs chi-square test on two mappings
*
* @param sample the mapping between categories(keys) and counts(values) of the observed sample
* @param expected the mapping between categories(keys) and counts(values) of the expected baseline
* @return ChiSqTestResult returns the chi-square test result object (contains both statistics and p-value)
*
*/
private[this] def chiSquareTest(
sample: scala.collection.mutable.Map[String, Double],
expected: scala.collection.mutable.Map[String, Double])
: ChiSqTestResult = {

var sampleArray = Array[Double]()
var expectedArray = Array[Double]()

expected.keySet.foreach { key =>
val cdf1: Double = sample.getOrElse(key, 0.0)
val cdf2: Double = expected(key)
sampleArray = sampleArray :+ cdf1
expectedArray = expectedArray :+ cdf2
}

val vecSample: Vector = Vectors.dense(sampleArray)
val vecExpected: Vector = Vectors.dense(expectedArray)

Statistics.chiSqTest(vecSample, vecExpected)
}

/** Calculate distance of categorical profiles based on L-Infinity Distance */
private[this] def categoricalLInfinityDistance(
sample1: scala.collection.mutable.Map[String, Long],
sample2: scala.collection.mutable.Map[String, Long],
correctForLowNumberOfSamples: Boolean = false,
alpha: Option[Double])
: Double = {
var n = 0.0
var m = 0.0
sample1.keySet.foreach { key =>
n += sample1(key)
}
sample2.keySet.foreach { key =>
m += sample2(key)
}
val combinedKeys = sample1.keySet.union(sample2.keySet)
var linfSimple = 0.0

combinedKeys.foreach { key =>
val cdf1 = sample1.getOrElse(key, 0L) / n
val cdf2 = sample2.getOrElse(key, 0L) / m
val cdfDiff = Math.abs(cdf1 - cdf2)
linfSimple = Math.max(linfSimple, cdfDiff)
}
selectMetrics(linfSimple, n, m, correctForLowNumberOfSamples, alpha)
}

/** Select which metrics to compute (linf_simple or linf_robust)
* based on whether samples are enough */
private[this] def selectMetrics(
linfSimple: Double,
n: Double,
m: Double,
correctForLowNumberOfSamples: Boolean = false)
correctForLowNumberOfSamples: Boolean = false,
alpha: Option[Double])
: Double = {
if (correctForLowNumberOfSamples) {
linfSimple
} else {
// This formula is based on “Two-sample Kolmogorov–Smirnov test"
// Reference: https://en.m.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
val linfRobust = Math.max(0.0, linfSimple - 1.8 * Math.sqrt((n + m) / (n * m)))

val cAlpha : Double = alpha match {
case Some(a) => Math.sqrt(-Math.log(a/2) * 1/2)
case None => defaultCAlpha
}
val linfRobust = Math.max(0.0, linfSimple - cAlpha * Math.sqrt((n + m) / (n * m)))
linfRobust
}
}
Expand Down
Loading

0 comments on commit e05caaa

Please sign in to comment.