Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Change Feed Full Fidelity Tests #43483

Merged
merged 8 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,28 +705,40 @@ private[cosmos] class CosmosRowConverterBase(

private def parseId(objectNode: ObjectNode): String = {
val currentNode = getCurrentOrPreviousNode(objectNode)
currentNode.get(IdAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
if (currentNode != null) {
currentNode.get(IdAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
}
} else {
null
}
}

private def parseTimestamp(objectNode: ObjectNode): Long = {
val currentNode = getCurrentOrPreviousNode(objectNode)
currentNode.get(TimestampAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(-1L)(v => v.asLong(-1))
case _ => -1L
if (currentNode != null) {
currentNode.get(TimestampAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(-1L)(v => v.asLong(-1))
case _ => -1L
}
} else {
-1L
}
}

private def parseETag(objectNode: ObjectNode): String = {
val currentNode = getCurrentOrPreviousNode(objectNode)
currentNode.get(ETagAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
if (currentNode != null) {
currentNode.get(ETagAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
}
} else {
null
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package com.azure.cosmos.spark
import com.azure.cosmos.SparkBridgeInternal
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState
import com.azure.cosmos.implementation.{TestConfigurations, Utils}
import com.azure.cosmos.models.PartitionKey
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.spark.udf.{CreateChangeFeedOffsetFromSpark2, CreateSpark2ContinuationsFromChangeFeedOffset, GetFeedRangeForPartitionKeyValue}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions
import org.apache.spark.sql.types._

import java.io.{BufferedReader, InputStreamReader}
Expand Down Expand Up @@ -442,104 +444,104 @@ class SparkE2EChangeFeedITest
rowsArray2 should have size 50 - initialCount
}

// "spark change feed query (full fidelity)" should "honor checkpoint location" in {
// val cosmosEndpoint = TestConfigurations.HOST
// val cosmosMasterKey = TestConfigurations.MASTER_KEY
//
// val checkpointLocation = s"/tmp/checkpoints/${UUID.randomUUID().toString}"
// val cfg = Map(
// "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
// "spark.cosmos.accountKey" -> cosmosMasterKey,
// "spark.cosmos.database" -> cosmosDatabase,
// "spark.cosmos.container" -> cosmosContainer,
// "spark.cosmos.read.inferSchema.enabled" -> "false",
// "spark.cosmos.changeFeed.mode" -> "FullFidelity",
// "spark.cosmos.changeFeed.startFrom" -> "NOW",
// "spark.cosmos.read.partitioning.strategy" -> "Restrictive",
// "spark.cosmos.changeFeed.batchCheckpointLocation" -> checkpointLocation
// )
//
// val df1 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
// val rowsArray1 = df1.collect()
// rowsArray1.length == 0 shouldEqual true
//
// df1.schema.equals(
// ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true
//
// val hdfs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
//
// val startOffsetFolderLocation = Paths.get(checkpointLocation, "startOffset").toString
// val startOffsetFileLocation = Paths.get(startOffsetFolderLocation, "0").toString
// hdfs.exists(new Path(startOffsetFolderLocation)) shouldEqual true
// hdfs.exists(new Path(startOffsetFileLocation)) shouldEqual false
//
// val latestOffsetFolderLocation = Paths.get(checkpointLocation, "latestOffset").toString
// val latestOffsetFileLocation = Paths.get(latestOffsetFolderLocation, "0").toString
// hdfs.exists(new Path(latestOffsetFolderLocation)) shouldEqual true
// hdfs.exists(new Path(latestOffsetFileLocation)) shouldEqual true
//
// // TODO - check for the offset structure to make sure it looks like the new lease format.
//
// hdfs.copyToLocalFile(true, new Path(latestOffsetFileLocation), new Path(startOffsetFileLocation))
// assert(!hdfs.exists(new Path(latestOffsetFileLocation)))
//
// val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)
//
// val createdObjectIds = new ArrayBuffer[String]()
// val replacedObjectIds = new ArrayBuffer[String]()
// val deletedObjectIds = new ArrayBuffer[String]()
// for (sequenceNumber <- 1 to 5) {
// val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
// objectNode.put("name", "Shrodigner's cat")
// objectNode.put("type", "cat")
// objectNode.put("age", 20)
// objectNode.put("sequenceNumber", sequenceNumber)
// val id = UUID.randomUUID().toString
// objectNode.put("id", id)
// createdObjectIds += id
// if (sequenceNumber % 2 == 0) {
// replacedObjectIds += id
// }
// if (sequenceNumber % 3 == 0) {
// deletedObjectIds += id
// }
// container.createItem(objectNode).block()
// }
//
// for (id <- replacedObjectIds) {
// val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
// objectNode.put("name", "Shrodigner's cat")
// objectNode.put("type", "dog")
// objectNode.put("age", 25)
// objectNode.put("id", id)
// container.replaceItem(objectNode, id, new PartitionKey(id)).block()
// }
//
// for (id <- deletedObjectIds) {
// container.deleteItem(id, new PartitionKey(id)).block()
// }
//
// // wait for the log store to get these changes
// Thread.sleep(2000)
//
// val df2 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
// val groupedFrame = df2.groupBy(CosmosTableSchemaInferrer.OperationTypeAttributeName)
// .agg(functions.collect_list("id").as("ids"))
//
// val collectedFrame = groupedFrame.collect()
// collectedFrame.foreach(row => {
// val wrappedArray = row.get(1).asInstanceOf[mutable.WrappedArray[String]]
// val array = wrappedArray.array
// row.get(0) match {
// case "create" =>
// validateArraysUnordered(createdObjectIds, array)
// case "replace" =>
// validateArraysUnordered(replacedObjectIds, array)
// case "delete" =>
// validateArraysUnordered(deletedObjectIds, array)
// }
// })
// }
"spark change feed query (full fidelity)" should "honor checkpoint location" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val checkpointLocation = s"/tmp/checkpoints/${UUID.randomUUID().toString}"
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.mode" -> "FullFidelity",
"spark.cosmos.changeFeed.startFrom" -> "NOW",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.changeFeed.batchCheckpointLocation" -> checkpointLocation
)

val df1 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val rowsArray1 = df1.collect()
rowsArray1.length == 0 shouldEqual true

df1.schema.equals(
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true

val hdfs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)

val startOffsetFolderLocation = Paths.get(checkpointLocation, "startOffset").toString
val startOffsetFileLocation = Paths.get(startOffsetFolderLocation, "0").toString
hdfs.exists(new Path(startOffsetFolderLocation)) shouldEqual true
hdfs.exists(new Path(startOffsetFileLocation)) shouldEqual false

val latestOffsetFolderLocation = Paths.get(checkpointLocation, "latestOffset").toString
val latestOffsetFileLocation = Paths.get(latestOffsetFolderLocation, "0").toString
hdfs.exists(new Path(latestOffsetFolderLocation)) shouldEqual true
hdfs.exists(new Path(latestOffsetFileLocation)) shouldEqual true

// TODO - check for the offset structure to make sure it looks like the new lease format.

hdfs.copyToLocalFile(true, new Path(latestOffsetFileLocation), new Path(startOffsetFileLocation))
assert(!hdfs.exists(new Path(latestOffsetFileLocation)))

val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)

val createdObjectIds = new ArrayBuffer[String]()
val replacedObjectIds = new ArrayBuffer[String]()
val deletedObjectIds = new ArrayBuffer[String]()
for (sequenceNumber <- 1 to 5) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "cat")
objectNode.put("age", 20)
objectNode.put("sequenceNumber", sequenceNumber)
val id = UUID.randomUUID().toString
objectNode.put("id", id)
createdObjectIds += id
if (sequenceNumber % 2 == 0) {
replacedObjectIds += id
}
if (sequenceNumber % 3 == 0) {
deletedObjectIds += id
}
container.createItem(objectNode).block()
}

for (id <- replacedObjectIds) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "dog")
objectNode.put("age", 25)
objectNode.put("id", id)
container.replaceItem(objectNode, id, new PartitionKey(id)).block()
}

for (id <- deletedObjectIds) {
container.deleteItem(id, new PartitionKey(id)).block()
}

// wait for the log store to get these changes
Thread.sleep(2000)

val df2 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val groupedFrame = df2.groupBy(CosmosTableSchemaInferrer.OperationTypeAttributeName)
.agg(functions.collect_list("id").as("ids"))

val collectedFrame = groupedFrame.collect()
collectedFrame.foreach(row => {
val wrappedArray = row.get(1).asInstanceOf[mutable.WrappedArray[String]]
val array = wrappedArray.array
row.get(0) match {
case "create" =>
validateArraysUnordered(createdObjectIds, array)
case "replace" =>
validateArraysUnordered(replacedObjectIds, array)
case "delete" =>
validateArraysUnordered(deletedObjectIds, array)
}
})
}

"spark change feed query (incremental)" can "proceed with simulated Spark2 Checkpoint" in {
val cosmosEndpoint = TestConfigurations.HOST
Expand Down
Loading
Loading