From 82a75ea6660fb05adf1e19941f20ae31e9fc560c Mon Sep 17 00:00:00 2001
From: Eric Simmerman <ericsimmerman@gmail.com>
Date: Wed, 24 Apr 2024 08:58:28 -0400
Subject: [PATCH 1/3] Alter change column behavior so that it honors
 comparators when determining what fields have changed

---
 .../uk/co/gresearch/spark/diff/Diff.scala     | 10 ++++---
 .../spark/diff/DiffComparatorSuite.scala      | 27 +++++++++++++++++++
 2 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala
index 9106de15..86853d81 100644
--- a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala
+++ b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala
@@ -19,6 +19,7 @@ package uk.co.gresearch.spark.diff
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{ArrayType, StringType}
+import uk.co.gresearch.spark.diff.comparator.DiffComparator
 import uk.co.gresearch.spark.{backticks, distinctPrefixFor}
 
 import scala.collection.JavaConverters
@@ -144,7 +145,7 @@ class Differ(options: DiffOptions) {
 
   private def getChangeColumn(
       existsColumnName: String,
-      valueColumns: Seq[String],
+      valueVolumnsWithComparator: Seq[(String, DiffComparator)],
       left: Dataset[_],
       right: Dataset[_]
   ): Option[Column] = {
@@ -152,12 +153,12 @@ class Differ(options: DiffOptions) {
       .map(changeColumn =>
         when(left(existsColumnName).isNull || right(existsColumnName).isNull, lit(null))
           .otherwise(
-            Some(valueColumns.toSeq)
+            Some(valueVolumnsWithComparator)
               .filter(_.nonEmpty)
               .map(columns =>
                 concat(
                   columns
-                    .map(c => when(left(backticks(c)) <=> right(backticks(c)), array()).otherwise(array(lit(c)))): _*
+                    .map(entry => when(entry._2.equiv(left(backticks(entry._1)), right(backticks(entry._1))), array()).otherwise(array(lit(entry._1)))): _*
                 )
               )
               .getOrElse(
@@ -282,6 +283,7 @@ class Differ(options: DiffOptions) {
         cmp.equiv(leftWithExists(backticks(c)), rightWithExists(backticks(c)))
       }
       .reduceOption(_ && _)
+
     val changeCondition = not(unChanged.getOrElse(lit(true)))
 
     val diffActionColumn =
@@ -292,7 +294,7 @@ class Differ(options: DiffOptions) {
         .as(options.diffColumn)
 
     val diffColumns = getDiffColumns(pkColumns, valueColumns, left, right, ignoreColumns).map(_._2)
-    val changeColumn = getChangeColumn(existsColumnName, valueColumns, leftWithExists, rightWithExists)
+    val changeColumn = getChangeColumn(existsColumnName, valueVolumnsWithComparator, leftWithExists, rightWithExists)
       // turn this column into a sequence of one or none column so we can easily concat it below with diffActionColumn and diffColumns
       .map(Seq(_))
       .getOrElse(Seq.empty[Column])
diff --git a/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
index 5b6022a7..246ad886 100644
--- a/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
+++ b/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
@@ -412,6 +412,33 @@ class DiffComparatorSuite extends AnyFunSuite with SparkTestSession {
         DiffOptions.default.withComparator(DiffComparators.duration(Duration.ofSeconds(61)).asExclusive(), "time")
       doTest(optionsWithTightComparator, optionsWithRelaxedComparator, leftTimes.toDF, rightTimes.toDF)
     }
+
+    test("changeset accounts for comparators") {
+      val changesetOptions = DiffOptions.default
+        .withComparator(DiffComparators.epsilon(10).asAbsolute().asInclusive(), "longValue")
+        .withChangeColumn("changeset")
+
+      lazy val left: Dataset[Numbers] = Seq(
+        Numbers(1, 1L, 1.0f, 1.0, Decimal(10, 8, 3), None, None),
+        Numbers(2, 2L, 2.0f, 2.0, Decimal(20, 8, 3), Some(2), Some(2L)),
+        Numbers(3, 3L, 3.0f, 3.0, Decimal(30, 8, 3), Some(3), Some(3L)),
+        Numbers(4, 4L, 4.0f, 4.0, Decimal(40, 8, 3), Some(4), None),
+        Numbers(5, 5L, 5.0f, 5.0, Decimal(50, 8, 3), None, Some(5L)),
+      ).toDS()
+
+      lazy val right: Dataset[Numbers] = Seq(
+        Numbers(1, 1L, 1.0f, 1.0, Decimal(10, 8, 3), None, None),
+        Numbers(2, 8L, 2.0f, 2.0, Decimal(20, 8, 3), Some(2), Some(2L)),
+        Numbers(3, 9L, 6.0f, 3.0, Decimal(30, 8, 3), Some(3), Some(3L)),
+        Numbers(4, 10L, 4.0f, 4.0, Decimal(40, 8, 3), Some(4), None),
+        Numbers(5, 11L, 5.0f, 5.0, Decimal(50, 8, 3), None, Some(5L)),
+      ).toDS()
+
+      val rs = left.diff(right, changesetOptions, "id")
+      assert(rs.where($"diff" === "C").count() == 1, "Only id=3 should differ with the numeric comparator applied")
+      val differingRow: Row = rs.where($"diff" === "C").head
+      assert(differingRow.getList(1).size() == 1, "Only floatVal differs after considering the comparators so the changeset should be size 1")
+    }
   }
 
   Seq(true, false).foreach { sensitive =>

From 499dfa179377f761e8c4ba522df46592f231aefe Mon Sep 17 00:00:00 2001
From: Eric Simmerman <ericsimmerman@gmail.com>
Date: Wed, 24 Apr 2024 10:29:28 -0400
Subject: [PATCH 2/3] Address requested changes

---
 .../scala/uk/co/gresearch/spark/diff/Diff.scala   |  4 +++-
 .../spark/diff/DiffComparatorSuite.scala          | 15 ++++++---------
 2 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala
index 86853d81..1573b27f 100644
--- a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala
+++ b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala
@@ -158,7 +158,9 @@ class Differ(options: DiffOptions) {
               .map(columns =>
                 concat(
                   columns
-                    .map(entry => when(entry._2.equiv(left(backticks(entry._1)), right(backticks(entry._1))), array()).otherwise(array(lit(entry._1)))): _*
+                    .map { case (c, cmp) =>
+                      when(cmp.equiv(left(backticks(c)), right(backticks(c))), array()).otherwise(array(lit(c)))
+                    }: _*
                 )
               )
               .getOrElse(
diff --git a/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
index 246ad886..0b80ed51 100644
--- a/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
+++ b/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
@@ -24,15 +24,12 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.scalatest.funsuite.AnyFunSuite
 import uk.co.gresearch.spark.SparkTestSession
-import uk.co.gresearch.spark.diff.DiffComparatorSuite.{
-  decimalEnc,
-  optionsWithRelaxedComparators,
-  optionsWithTightComparators
-}
+import uk.co.gresearch.spark.diff.DiffComparatorSuite.{decimalEnc, optionsWithRelaxedComparators, optionsWithTightComparators}
 import uk.co.gresearch.spark.diff.comparator._
 
 import java.sql.{Date, Timestamp}
 import java.time.Duration
+import java.util
 
 case class Numbers(
     id: Int,
@@ -434,10 +431,10 @@ class DiffComparatorSuite extends AnyFunSuite with SparkTestSession {
         Numbers(5, 11L, 5.0f, 5.0, Decimal(50, 8, 3), None, Some(5L)),
       ).toDS()
 
-      val rs = left.diff(right, changesetOptions, "id")
-      assert(rs.where($"diff" === "C").count() == 1, "Only id=3 should differ with the numeric comparator applied")
-      val differingRow: Row = rs.where($"diff" === "C").head
-      assert(differingRow.getList(1).size() == 1, "Only floatVal differs after considering the comparators so the changeset should be size 1")
+      val rs = left.diff(right, changesetOptions, "id").where($"diff" === "C")
+      assert(rs.count() == 1, "Only one row should differ with the numeric comparator applied")
+      val changesInDifferingRow: util.List[String] = rs.head.getList[String](1)
+      assert(changesInDifferingRow.get(0) == "floatValue", "Only floatVal differs after considering the comparators so the changeset should be size 1")
     }
   }
 

From 2c5124efc2fdcb0d1dc443f2dd37a7d7f2bfacaa Mon Sep 17 00:00:00 2001
From: Eric Simmerman <ericsimmerman@gmail.com>
Date: Wed, 24 Apr 2024 16:58:16 -0400
Subject: [PATCH 3/3] Linter updates

---
 .../co/gresearch/spark/diff/DiffComparatorSuite.scala | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
index 0b80ed51..af43781c 100644
--- a/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
+++ b/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
@@ -24,7 +24,11 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.scalatest.funsuite.AnyFunSuite
 import uk.co.gresearch.spark.SparkTestSession
-import uk.co.gresearch.spark.diff.DiffComparatorSuite.{decimalEnc, optionsWithRelaxedComparators, optionsWithTightComparators}
+import uk.co.gresearch.spark.diff.DiffComparatorSuite.{
+  decimalEnc,
+  optionsWithRelaxedComparators,
+  optionsWithTightComparators
+}
 import uk.co.gresearch.spark.diff.comparator._
 
 import java.sql.{Date, Timestamp}
@@ -434,7 +438,10 @@ class DiffComparatorSuite extends AnyFunSuite with SparkTestSession {
       val rs = left.diff(right, changesetOptions, "id").where($"diff" === "C")
       assert(rs.count() == 1, "Only one row should differ with the numeric comparator applied")
       val changesInDifferingRow: util.List[String] = rs.head.getList[String](1)
-      assert(changesInDifferingRow.get(0) == "floatValue", "Only floatVal differs after considering the comparators so the changeset should be size 1")
+      assert(
+        changesInDifferingRow.get(0) == "floatValue",
+        "Only floatVal differs after considering the comparators so the changeset should be size 1"
+      )
     }
   }