Skip to content

Commit

Permalink
Fix Change Feed Full Fidelity Tests (#43483)
Browse files Browse the repository at this point in the history
* Fix java sdk tests for new emulator behavior for ff

* Changing spark to ignore previous node for change feed

* Updating changelog

* Fixed tests

* Reacting to comments

* merge with main

* fix test
  • Loading branch information
tvaron3 authored Jan 10, 2025
1 parent 15b2a10 commit f6a6562
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 137 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,28 +705,40 @@ private[cosmos] class CosmosRowConverterBase(

private def parseId(objectNode: ObjectNode): String = {
val currentNode = getCurrentOrPreviousNode(objectNode)
currentNode.get(IdAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
if (currentNode != null) {
currentNode.get(IdAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
}
} else {
null
}
}

private def parseTimestamp(objectNode: ObjectNode): Long = {
val currentNode = getCurrentOrPreviousNode(objectNode)
currentNode.get(TimestampAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(-1L)(v => v.asLong(-1))
case _ => -1L
if (currentNode != null) {
currentNode.get(TimestampAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(-1L)(v => v.asLong(-1))
case _ => -1L
}
} else {
-1L
}
}

private def parseETag(objectNode: ObjectNode): String = {
val currentNode = getCurrentOrPreviousNode(objectNode)
currentNode.get(ETagAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
if (currentNode != null) {
currentNode.get(ETagAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
}
} else {
null
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package com.azure.cosmos.spark
import com.azure.cosmos.SparkBridgeInternal
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState
import com.azure.cosmos.implementation.{TestConfigurations, Utils}
import com.azure.cosmos.models.PartitionKey
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.spark.udf.{CreateChangeFeedOffsetFromSpark2, CreateSpark2ContinuationsFromChangeFeedOffset, GetFeedRangeForPartitionKeyValue}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions
import org.apache.spark.sql.types._

import java.io.{BufferedReader, InputStreamReader}
Expand Down Expand Up @@ -442,104 +444,104 @@ class SparkE2EChangeFeedITest
rowsArray2 should have size 50 - initialCount
}

// "spark change feed query (full fidelity)" should "honor checkpoint location" in {
// val cosmosEndpoint = TestConfigurations.HOST
// val cosmosMasterKey = TestConfigurations.MASTER_KEY
//
// val checkpointLocation = s"/tmp/checkpoints/${UUID.randomUUID().toString}"
// val cfg = Map(
// "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
// "spark.cosmos.accountKey" -> cosmosMasterKey,
// "spark.cosmos.database" -> cosmosDatabase,
// "spark.cosmos.container" -> cosmosContainer,
// "spark.cosmos.read.inferSchema.enabled" -> "false",
// "spark.cosmos.changeFeed.mode" -> "FullFidelity",
// "spark.cosmos.changeFeed.startFrom" -> "NOW",
// "spark.cosmos.read.partitioning.strategy" -> "Restrictive",
// "spark.cosmos.changeFeed.batchCheckpointLocation" -> checkpointLocation
// )
//
// val df1 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
// val rowsArray1 = df1.collect()
// rowsArray1.length == 0 shouldEqual true
//
// df1.schema.equals(
// ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true
//
// val hdfs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
//
// val startOffsetFolderLocation = Paths.get(checkpointLocation, "startOffset").toString
// val startOffsetFileLocation = Paths.get(startOffsetFolderLocation, "0").toString
// hdfs.exists(new Path(startOffsetFolderLocation)) shouldEqual true
// hdfs.exists(new Path(startOffsetFileLocation)) shouldEqual false
//
// val latestOffsetFolderLocation = Paths.get(checkpointLocation, "latestOffset").toString
// val latestOffsetFileLocation = Paths.get(latestOffsetFolderLocation, "0").toString
// hdfs.exists(new Path(latestOffsetFolderLocation)) shouldEqual true
// hdfs.exists(new Path(latestOffsetFileLocation)) shouldEqual true
//
// // TODO - check for the offset structure to make sure it looks like the new lease format.
//
// hdfs.copyToLocalFile(true, new Path(latestOffsetFileLocation), new Path(startOffsetFileLocation))
// assert(!hdfs.exists(new Path(latestOffsetFileLocation)))
//
// val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)
//
// val createdObjectIds = new ArrayBuffer[String]()
// val replacedObjectIds = new ArrayBuffer[String]()
// val deletedObjectIds = new ArrayBuffer[String]()
// for (sequenceNumber <- 1 to 5) {
// val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
// objectNode.put("name", "Shrodigner's cat")
// objectNode.put("type", "cat")
// objectNode.put("age", 20)
// objectNode.put("sequenceNumber", sequenceNumber)
// val id = UUID.randomUUID().toString
// objectNode.put("id", id)
// createdObjectIds += id
// if (sequenceNumber % 2 == 0) {
// replacedObjectIds += id
// }
// if (sequenceNumber % 3 == 0) {
// deletedObjectIds += id
// }
// container.createItem(objectNode).block()
// }
//
// for (id <- replacedObjectIds) {
// val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
// objectNode.put("name", "Shrodigner's cat")
// objectNode.put("type", "dog")
// objectNode.put("age", 25)
// objectNode.put("id", id)
// container.replaceItem(objectNode, id, new PartitionKey(id)).block()
// }
//
// for (id <- deletedObjectIds) {
// container.deleteItem(id, new PartitionKey(id)).block()
// }
//
// // wait for the log store to get these changes
// Thread.sleep(2000)
//
// val df2 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
// val groupedFrame = df2.groupBy(CosmosTableSchemaInferrer.OperationTypeAttributeName)
// .agg(functions.collect_list("id").as("ids"))
//
// val collectedFrame = groupedFrame.collect()
// collectedFrame.foreach(row => {
// val wrappedArray = row.get(1).asInstanceOf[mutable.WrappedArray[String]]
// val array = wrappedArray.array
// row.get(0) match {
// case "create" =>
// validateArraysUnordered(createdObjectIds, array)
// case "replace" =>
// validateArraysUnordered(replacedObjectIds, array)
// case "delete" =>
// validateArraysUnordered(deletedObjectIds, array)
// }
// })
// }
"spark change feed query (full fidelity)" should "honor checkpoint location" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val checkpointLocation = s"/tmp/checkpoints/${UUID.randomUUID().toString}"
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.mode" -> "FullFidelity",
"spark.cosmos.changeFeed.startFrom" -> "NOW",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.changeFeed.batchCheckpointLocation" -> checkpointLocation
)

val df1 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val rowsArray1 = df1.collect()
rowsArray1.length == 0 shouldEqual true

df1.schema.equals(
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true

val hdfs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)

val startOffsetFolderLocation = Paths.get(checkpointLocation, "startOffset").toString
val startOffsetFileLocation = Paths.get(startOffsetFolderLocation, "0").toString
hdfs.exists(new Path(startOffsetFolderLocation)) shouldEqual true
hdfs.exists(new Path(startOffsetFileLocation)) shouldEqual false

val latestOffsetFolderLocation = Paths.get(checkpointLocation, "latestOffset").toString
val latestOffsetFileLocation = Paths.get(latestOffsetFolderLocation, "0").toString
hdfs.exists(new Path(latestOffsetFolderLocation)) shouldEqual true
hdfs.exists(new Path(latestOffsetFileLocation)) shouldEqual true

// TODO - check for the offset structure to make sure it looks like the new lease format.

hdfs.copyToLocalFile(true, new Path(latestOffsetFileLocation), new Path(startOffsetFileLocation))
assert(!hdfs.exists(new Path(latestOffsetFileLocation)))

val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)

val createdObjectIds = new ArrayBuffer[String]()
val replacedObjectIds = new ArrayBuffer[String]()
val deletedObjectIds = new ArrayBuffer[String]()
for (sequenceNumber <- 1 to 5) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "cat")
objectNode.put("age", 20)
objectNode.put("sequenceNumber", sequenceNumber)
val id = UUID.randomUUID().toString
objectNode.put("id", id)
createdObjectIds += id
if (sequenceNumber % 2 == 0) {
replacedObjectIds += id
}
if (sequenceNumber % 3 == 0) {
deletedObjectIds += id
}
container.createItem(objectNode).block()
}

for (id <- replacedObjectIds) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "dog")
objectNode.put("age", 25)
objectNode.put("id", id)
container.replaceItem(objectNode, id, new PartitionKey(id)).block()
}

for (id <- deletedObjectIds) {
container.deleteItem(id, new PartitionKey(id)).block()
}

// wait for the log store to get these changes
Thread.sleep(2000)

val df2 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val groupedFrame = df2.groupBy(CosmosTableSchemaInferrer.OperationTypeAttributeName)
.agg(functions.collect_list("id").as("ids"))

val collectedFrame = groupedFrame.collect()
collectedFrame.foreach(row => {
val wrappedArray = row.get(1).asInstanceOf[mutable.WrappedArray[String]]
val array = wrappedArray.array
row.get(0) match {
case "create" =>
validateArraysUnordered(createdObjectIds, array)
case "replace" =>
validateArraysUnordered(replacedObjectIds, array)
case "delete" =>
validateArraysUnordered(deletedObjectIds, array)
}
})
}

"spark change feed query (incremental)" can "proceed with simulated Spark2 Checkpoint" in {
val cosmosEndpoint = TestConfigurations.HOST
Expand Down
Loading

0 comments on commit f6a6562

Please sign in to comment.