From 1c1afdfb12782d918a425e4906715bca086a5e3b Mon Sep 17 00:00:00 2001 From: Edward Cho <114528615+eycho-am@users.noreply.github.com> Date: Thu, 13 Apr 2023 16:27:57 -0400 Subject: [PATCH] Feature: Length Row Level Results (#465) - Add row level results for MinLength in addition to MaxLength - Add AnalzyerOptions case class with options to change null behavior (Ignore, EmptyString, Fail) --- .../com/amazon/deequ/VerificationResult.scala | 9 +- .../com/amazon/deequ/analyzers/Analyzer.scala | 38 +++++++- .../amazon/deequ/analyzers/MaxLength.scala | 36 +++++-- .../amazon/deequ/analyzers/MinLength.scala | 37 ++++++-- .../com/amazon/deequ/analyzers/Minimum.scala | 6 +- .../scala/com/amazon/deequ/checks/Check.scala | 15 +-- .../amazon/deequ/constraints/Constraint.scala | 18 +++- .../amazon/deequ/VerificationSuiteTest.scala | 83 +++++++++++++++-- .../amazon/deequ/analyzers/AnalysisTest.scala | 10 +- .../deequ/analyzers/MaxLengthTest.scala | 26 ++++++ .../deequ/analyzers/MinLengthTest.scala | 93 +++++++++++++++++++ .../deequ/analyzers/StateProviderTest.scala | 20 +++- .../repository/AnalysisResultSerdeTest.scala | 8 +- .../amazon/deequ/utils/FixtureSupport.scala | 11 +++ 14 files changed, 358 insertions(+), 52 deletions(-) create mode 100644 src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala diff --git a/src/main/scala/com/amazon/deequ/VerificationResult.scala b/src/main/scala/com/amazon/deequ/VerificationResult.scala index c4c4ea869..e0a328aa8 100644 --- a/src/main/scala/com/amazon/deequ/VerificationResult.scala +++ b/src/main/scala/com/amazon/deequ/VerificationResult.scala @@ -18,17 +18,18 @@ package com.amazon.deequ import com.amazon.deequ.analyzers.Analyzer import com.amazon.deequ.analyzers.runners.AnalyzerContext -import com.amazon.deequ.checks.{Check, CheckResult, CheckStatus} -import com.amazon.deequ.constraints.AnalysisBasedConstraint +import com.amazon.deequ.checks.Check +import com.amazon.deequ.checks.CheckResult +import com.amazon.deequ.checks.CheckStatus import com.amazon.deequ.constraints.ConstraintResult -import com.amazon.deequ.constraints.NamedConstraint import com.amazon.deequ.constraints.RowLevelAssertedConstraint import com.amazon.deequ.constraints.RowLevelConstraint import com.amazon.deequ.metrics.FullColumn import com.amazon.deequ.metrics.Metric import com.amazon.deequ.repository.SimpleResultSerde import org.apache.spark.sql.Column -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.SparkSession /** * The result returned from the VerificationSuite diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index eae17459f..1dd0bf248 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -17,16 +17,23 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ -import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} +import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.runners._ import com.amazon.deequ.metrics.FullColumn import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.Entity +import com.amazon.deequ.metrics.Metric +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.Column +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession import scala.language.existentials -import scala.util.{Failure, Success} +import scala.util.Failure +import scala.util.Success /** * A state (sufficient statistic) computed from data, from which we can compute a metric. @@ -249,6 +256,13 @@ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullCo } } +case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore) + +object NullBehavior extends Enumeration { + type NullBehavior = Value + val Ignore, EmptyString, Fail = Value +} + /** Base class for analyzers that compute ratios of matching predicates */ abstract class PredicateMatchingAnalyzer( name: String, @@ -453,6 +467,20 @@ private[deequ] object Analyzers { conditionalSelection(col(selection), where) } + def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = { + val conditionColumn = where.map(expr) + conditionColumn + .map { condition => when(condition, replaceWith).otherwise(selection) } + .getOrElse(selection) + } + + def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = { + val conditionColumn = where.map(expr) + conditionColumn + .map { condition => when(condition, replaceWith).otherwise(selection) } + .getOrElse(selection) + } + def conditionalSelection(selection: Column, condition: Option[String]): Column = { val conditionColumn = condition.map { expression => expr(expression) } conditionalSelectionFromColumns(selection, conditionColumn) diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala index 8aa00451e..2ad9a8ab6 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala @@ -17,22 +17,28 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ -import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isString} -import org.apache.spark.sql.functions.{length, max} -import org.apache.spark.sql.types.{DoubleType, StructType} -import org.apache.spark.sql.{Column, Row} +import com.amazon.deequ.analyzers.NullBehavior.NullBehavior +import com.amazon.deequ.analyzers.Preconditions.hasColumn +import com.amazon.deequ.analyzers.Preconditions.isString +import org.apache.spark.sql.Column +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.length +import org.apache.spark.sql.functions.max +import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.StructType -case class MaxLength(column: String, where: Option[String] = None) +case class MaxLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) extends StandardScanShareableAnalyzer[MaxState]("MaxLength", column) with FilterableAnalyzer { override def aggregationFunctions(): Seq[Column] = { - max(criterion) :: Nil + max(criterion(getNullBehavior)) :: Nil } override def fromAggregationResult(result: Row, offset: Int): Option[MaxState] = { ifNoNullsIn(result, offset) { _ => - MaxState(result.getDouble(offset), Some(criterion)) + MaxState(result.getDouble(offset), Some(criterion(getNullBehavior))) } } @@ -42,5 +48,19 @@ case class MaxLength(column: String, where: Option[String] = None) override def filterCondition: Option[String] = where - private def criterion: Column = length(conditionalSelection(column, where)).cast(DoubleType) + private def criterion(nullBehavior: NullBehavior): Column = { + nullBehavior match { + case NullBehavior.Fail => + val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) + conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MaxValue) + case NullBehavior.EmptyString => + length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType) + case _ => length(conditionalSelection(column, where)).cast(DoubleType) + } + } + private def getNullBehavior: NullBehavior = { + analyzerOptions + .map { options => options.nullBehavior } + .getOrElse(NullBehavior.Ignore) + } } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala index dc123003a..f2c2849a8 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala @@ -17,22 +17,28 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ -import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isString} -import org.apache.spark.sql.functions.{length, min} -import org.apache.spark.sql.types.{DoubleType, StructType} -import org.apache.spark.sql.{Column, Row} +import com.amazon.deequ.analyzers.NullBehavior.NullBehavior +import com.amazon.deequ.analyzers.Preconditions.hasColumn +import com.amazon.deequ.analyzers.Preconditions.isString +import org.apache.spark.sql.Column +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.length +import org.apache.spark.sql.functions.min +import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.StructType -case class MinLength(column: String, where: Option[String] = None) +case class MinLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) extends StandardScanShareableAnalyzer[MinState]("MinLength", column) with FilterableAnalyzer { override def aggregationFunctions(): Seq[Column] = { - min(length(conditionalSelection(column, where))).cast(DoubleType) :: Nil + min(criterion(getNullBehavior)) :: Nil } override def fromAggregationResult(result: Row, offset: Int): Option[MinState] = { ifNoNullsIn(result, offset) { _ => - MinState(result.getDouble(offset)) + MinState(result.getDouble(offset), Some(criterion(getNullBehavior))) } } @@ -41,4 +47,21 @@ case class MinLength(column: String, where: Option[String] = None) } override def filterCondition: Option[String] = where + + private[deequ] def criterion(nullBehavior: NullBehavior): Column = { + nullBehavior match { + case NullBehavior.Fail => + val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) + conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MinValue) + case NullBehavior.EmptyString => + length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType) + case _ => length(conditionalSelection(column, where)).cast(DoubleType) + } + } + + private def getNullBehavior: NullBehavior = { + analyzerOptions + .map { options => options.nullBehavior } + .getOrElse(NullBehavior.Ignore) + } } diff --git a/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala b/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala index 68ef926cf..6a3376ddd 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala @@ -21,11 +21,13 @@ import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.functions.min import org.apache.spark.sql.types.{DoubleType, StructType} import Analyzers._ +import com.amazon.deequ.metrics.FullColumn -case class MinState(minValue: Double) extends DoubleValuedState[MinState] { +case class MinState(minValue: Double, override val fullColumn: Option[Column] = None) + extends DoubleValuedState[MinState] with FullColumn { override def sum(other: MinState): MinState = { - MinState(math.min(minValue, other.minValue)) + MinState(math.min(minValue, other.minValue), sum(fullColumn, other.fullColumn)) } override def metricValue(): Double = { diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index 500326f52..856abed81 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -16,16 +16,17 @@ package com.amazon.deequ.checks +import com.amazon.deequ.analyzers.AnalyzerOptions import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint} import com.amazon.deequ.analyzers.runners.AnalyzerContext -import com.amazon.deequ.analyzers.{Analyzer, Histogram, Patterns, State, KLLParameters} +import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State} import com.amazon.deequ.constraints.Constraint._ import com.amazon.deequ.constraints._ import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric} import com.amazon.deequ.repository.MetricsRepository import org.apache.spark.sql.expressions.UserDefinedFunction import com.amazon.deequ.anomalydetection.HistoryUtils -import com.amazon.deequ.checks.ColumnCondition.{isEachNotNull, isAnyNotNull} +import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull} import scala.util.matching.Regex @@ -516,10 +517,11 @@ case class Check( def hasMinLength( column: String, assertion: Double => Boolean, - hint: Option[String] = None) + hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) : CheckWithLastConstraintFilterable = { - addFilterableConstraint { filter => minLengthConstraint(column, assertion, filter, hint) } + addFilterableConstraint { filter => minLengthConstraint(column, assertion, filter, hint, analyzerOptions) } } /** @@ -533,10 +535,11 @@ case class Check( def hasMaxLength( column: String, assertion: Double => Boolean, - hint: Option[String] = None) + hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) : CheckWithLastConstraintFilterable = { - addFilterableConstraint { filter => maxLengthConstraint(column, assertion, filter, hint) } + addFilterableConstraint { filter => maxLengthConstraint(column, assertion, filter, hint, analyzerOptions) } } /** diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 0afff24f6..1ccf2ce41 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -426,10 +426,11 @@ object Constraint { column: String, assertion: Double => Boolean, where: Option[String] = None, - hint: Option[String] = None) + hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) : Constraint = { - val maxLength = MaxLength(column, where) + val maxLength = MaxLength(column, where, analyzerOptions) val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maxLength, assertion, hint = hint) @@ -454,15 +455,22 @@ object Constraint { column: String, assertion: Double => Boolean, where: Option[String] = None, - hint: Option[String] = None) + hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) : Constraint = { - val minLength = MinLength(column, where) + val minLength = MinLength(column, where, analyzerOptions) val constraint = AnalysisBasedConstraint[MinState, Double, Double](minLength, assertion, hint = hint) - new NamedConstraint(constraint, s"MinLengthConstraint($minLength)") + val sparkAssertion = org.apache.spark.sql.functions.udf(assertion) + + new RowLevelAssertedConstraint( + constraint, + s"MinLengthConstraint($minLength)", + s"ColumnLength-$column", + sparkAssertion) } /** diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 3a342f2f6..8f92e6066 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -26,9 +26,9 @@ import com.amazon.deequ.constraints.Constraint import com.amazon.deequ.io.DfsUtils import com.amazon.deequ.metrics.DoubleMetric import com.amazon.deequ.metrics.Entity -import com.amazon.deequ.repository.memory.InMemoryMetricsRepository import com.amazon.deequ.repository.MetricsRepository import com.amazon.deequ.repository.ResultKey +import com.amazon.deequ.repository.memory.InMemoryMetricsRepository import com.amazon.deequ.utils.CollectionUtils.SeqExtensions import com.amazon.deequ.utils.FixtureSupport import com.amazon.deequ.utils.TempFileUtils @@ -167,8 +167,10 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val isComplete = new Check(CheckLevel.Error, "rule1").isComplete("att1") val completeness = new Check(CheckLevel.Error, "rule2").hasCompleteness("att2", _ > 0.7) val isPrimaryKey = new Check(CheckLevel.Error, "rule3").isPrimaryKey("item") - val minLength = new Check(CheckLevel.Error, "rule4").hasMaxLength("item", _ <= 3) - val maxLength = new Check(CheckLevel.Error, "rule5").hasMaxLength("item", _ > 1) + val minLength = new Check(CheckLevel.Error, "rule4") + .hasMinLength("item", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) + val maxLength = new Check(CheckLevel.Error, "rule5") + .hasMaxLength("item", _ <= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) val expectedColumn1 = isComplete.description val expectedColumn2 = completeness.description val expectedColumn3 = minLength.description @@ -199,11 +201,78 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) assert(Seq(true, true, false, true, false, true).sameElements(rowLevel2)) - val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getAs[Boolean](0)) - assert(Seq(true, true, true, false, false, false).sameElements(rowLevel3)) + val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getBoolean(0)) + assert(Seq(true, true, true, true, true, true).sameElements(rowLevel3)) + + val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0)) + assert(Seq(true, false, false, false, false, false).sameElements(rowLevel4)) + } + + "generate a result that contains length row-level results with nullBehavior fail" in withSparkSession { session => + val data = getDfCompleteAndInCompleteColumnsAndVarLengthStrings(session) + + val minLength = new Check(CheckLevel.Error, "rule1") + .hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) + val maxLength = new Check(CheckLevel.Error, "rule2") + .hasMaxLength("att2", _ <= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) + val expectedColumn1 = minLength.description + val expectedColumn2 = maxLength.description + + val suite = new VerificationSuite().onData(data) + .addCheck(minLength) + .addCheck(maxLength) + + val result: VerificationResult = suite.run() + + assert(result.status == CheckStatus.Error) + + val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data) + + resultData.show() + val expectedColumns: Set[String] = + data.columns.toSet + expectedColumn1 + expectedColumn2 + assert(resultData.columns.toSet == expectedColumns) + + val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0)) + assert(Seq(true, true, false, true, false, true).sameElements(rowLevel1)) + + val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) + assert(Seq(true, true, false, true, false, true).sameElements(rowLevel2)) + } + + "generate a result that contains length row-level results with nullBehavior empty" in withSparkSession { session => + val data = getDfCompleteAndInCompleteColumnsAndVarLengthStrings(session) + + // null should fail since length 0 is not >= 1 + val minLength = new Check(CheckLevel.Error, "rule1") + .hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.EmptyString))) + // nulls should succeed since length 0 is < 2 + val maxLength = new Check(CheckLevel.Error, "rule2") + .hasMaxLength("att2", _ < 2, analyzerOptions = Option(AnalyzerOptions(NullBehavior.EmptyString))) + val expectedColumn1 = minLength.description + val expectedColumn2 = maxLength.description + + val suite = new VerificationSuite().onData(data) + .addCheck(minLength) + .addCheck(maxLength) + + val result: VerificationResult = suite.run() + + assert(result.status == CheckStatus.Error) + + val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data) + + resultData.show() + val expectedColumns: Set[String] = + data.columns.toSet + expectedColumn1 + expectedColumn2 + assert(resultData.columns.toSet == expectedColumns) + - val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getAs[Boolean](0)) - assert(Seq(false, true, true, true, true, true).sameElements(rowLevel4)) + val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0)) + assert(Seq(true, true, false, true, false, true).sameElements(rowLevel1)) + + val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) + assert(Seq(true, true, true, true, true, true).sameElements(rowLevel2)) } "accept analysis config for mandatory analysis" in withSparkSession { sparkSession => diff --git a/src/test/scala/com/amazon/deequ/analyzers/AnalysisTest.scala b/src/test/scala/com/amazon/deequ/analyzers/AnalysisTest.scala index cd1b8fd4d..72945e422 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/AnalysisTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/AnalysisTest.scala @@ -158,8 +158,14 @@ class AnalysisTest extends AnyWordSpec with Matchers with SparkContextSpec with fullColumn.isDefined shouldBe true } - resultMetrics should contain(DoubleMetric(Entity.Column, "MinLength", "att1", - Success(0.0))) + val minLengthMetric = resultMetrics.tail.head + inside (minLengthMetric) { case DoubleMetric(entity, name, instance, value, fullColumn) => + entity shouldBe Entity.Column + name shouldBe "MinLength" + instance shouldBe "att1" + value shouldBe Success(0.0) + fullColumn.isDefined shouldBe true + } } "return the proper exception for non existing columns" in withSparkSession { sparkSession => diff --git a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala index b3f9ae96c..1c8d67471 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala @@ -49,6 +49,32 @@ class MaxLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) } + "return row-level results for null columns with NullBehavior fail option" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + // It's null in two rows + val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) + val state: Option[MaxState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MaxValue, 1.0, Double.MaxValue, 1.0) + } + + "return row-level results for null columns with NullBehavior empty option" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + // It's null in two rows + val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.EmptyString))) + val state: Option[MaxState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) + } + "return row-level results for blank strings" in withSparkSession { session => val data = getEmptyColumnDataDf(session) diff --git a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala new file mode 100644 index 000000000..b9d706a8a --- /dev/null +++ b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala @@ -0,0 +1,93 @@ +/** + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + + +package com.amazon.deequ.analyzers + +import com.amazon.deequ.SparkContextSpec +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.FullColumn +import com.amazon.deequ.utils.FixtureSupport +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with FixtureSupport { + + "MinLength" should { + "return row-level results for non-null columns" in withSparkSession { session => + + val data = getDfWithStringColumns(session) + + val countryLength = MinLength("Country") // It's "India" in every row + val state: Option[MinState] = countryLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = countryLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0) + } + + "return row-level results for null columns" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + val addressLength = MinLength("att3") // It's null in two rows + val state: Option[MinState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) + } + + "return row-level results for null columns with NullBehavior fail option" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + // It's null in two rows + val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) + val state: Option[MinState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MinValue, 1.0, Double.MinValue, 1.0) + } + + "return row-level results for null columns with NullBehavior empty option" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + // It's null in two rows + val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.EmptyString))) + val state: Option[MinState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) + } + + "return row-level results for blank strings" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + val addressLength = MinLength("att1") // It's empty strings + val state: Option[MinState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) + } + } + +} diff --git a/src/test/scala/com/amazon/deequ/analyzers/StateProviderTest.scala b/src/test/scala/com/amazon/deequ/analyzers/StateProviderTest.scala index 0c76bc095..d1e9c7086 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/StateProviderTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/StateProviderTest.scala @@ -87,13 +87,13 @@ class StateProviderTest extends AnyWordSpec assertCorrectlyRestoresState[SumState](provider, provider, Sum("price"), data) assertCorrectlyRestoresState[MeanState](provider, provider, Mean("price"), data) - assertCorrectlyRestoresState[MinState](provider, provider, Minimum("price"), data) + assertCorrectlyRestoresMinState(provider, provider, Minimum("price"), data) assertCorrectlyRestoresMaxState(provider, provider, Maximum("price"), data) assertCorrectlyRestoresState[StandardDeviationState](provider, provider, StandardDeviation("price"), data) assertCorrectlyRestoresMaxState(provider, provider, MaxLength("att1"), data) - assertCorrectlyRestoresState[MinState](provider, provider, MinLength("att1"), data) + assertCorrectlyRestoresMinState(provider, provider, MinLength("att1"), data) assertCorrectlyRestoresState[DataTypeHistogram](provider, provider, DataType("item"), data) assertCorrectlyRestoresStateForHLL(provider, provider, ApproxCountDistinct("att1"), data) @@ -204,6 +204,22 @@ class StateProviderTest extends AnyWordSpec assert(MaxState(expectedState.maxValue, None) == restoredState.get) } + def assertCorrectlyRestoresMinState(persister: StatePersister, + loader: StateLoader, + analyzer: Analyzer[MinState, _], + data: DataFrame): Unit = { + + val stateResult = analyzer.computeStateFrom(data) + assert(stateResult.isDefined) + val expectedState = stateResult.get + + persister.persist[MinState](analyzer, expectedState) + val restoredState = loader.load[MinState](analyzer) + + assert(restoredState.isDefined) + assert(MinState(expectedState.minValue, None) == restoredState.get) + } + def assertCorrectlyRestoresState[S <: State[S]]( persister: StatePersister, loader: StateLoader, diff --git a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala index 61cb40e41..aeec94994 100644 --- a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala +++ b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala @@ -228,10 +228,6 @@ class SimpleResultSerdeTest extends WordSpec with Matchers with SparkContextSpec |"instance":"att2","name":"Completeness","value":1.0}, |{"dataset_date":1507975810,"entity":"Column","region":"EU", |"instance":"att1","name":"Completeness","value":1.0}, - |{"dataset_date":1507975810,"entity":"Column","region":"EU", - |"instance":"att1","name":"MinLength","value":1.0}, - |{"dataset_date":1507975810,"entity":"Column","region":"EU", - |"instance":"att1","name":"MaxLength","value":1.0}, |{"dataset_date":1507975810,"entity":"Mutlicolumn","region":"EU", |"instance":"att1,att2","name":"MutualInformation","value":0.5623351446188083}, |{"dataset_date":1507975810,"entity":"Dataset","region":"EU", @@ -241,6 +237,10 @@ class SimpleResultSerdeTest extends WordSpec with Matchers with SparkContextSpec |{"dataset_date":1507975810,"entity":"Column","region":"EU", |"instance":"att1","name":"Distinctness","value":0.5}, |{"dataset_date":1507975810,"entity":"Column","region":"EU", + |"instance":"att1","name":"MinLength","value":1.0}, + |{"dataset_date":1507975810,"entity":"Column","region":"EU", + |"instance":"att1","name":"MaxLength","value":1.0}, + |{"dataset_date":1507975810,"entity":"Column","region":"EU", |"instance":"att2","name":"Uniqueness","value":0.25}]""" .stripMargin.replaceAll("\n", "") diff --git a/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala b/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala index da54f6e5e..073f699ed 100644 --- a/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala +++ b/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala @@ -170,6 +170,17 @@ trait FixtureSupport { ).toDF("item", "att1", "att2") } + def getDateDf(sparkSession: SparkSession): DataFrame = { + import sparkSession.implicits._ + + Seq( + (100, "Furniture", "Product 1", 25, null), + (101, "Cosmetics", "Product 2", 20, "2022-01-05"), + (102, "Furniture", "Product 3", 30, null), + (103, "Electronics", "Product 4", 10, null), + (104, "Electronics", "Product 5", 50, null) + ).toDF("id", "product", "product_id", "units", "date") + } def getDfCompleteAndInCompleteColumnsDelta(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._