Skip to content

Commit

Permalink
Simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonport-db committed Feb 20, 2025
1 parent 562cfe4 commit aae9d64
Showing 1 changed file with 27 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package io.delta.kernel.defaults
import java.io.File
import java.nio.file.Files
import java.util
import java.util.Collections.emptyMap
import java.util.Optional

import scala.collection.immutable.Seq
import scala.jdk.CollectionConverters.setAsJavaSetConverter

import io.delta.kernel.{Operation, Table, Transaction}
import io.delta.kernel.{Operation, Table}
import io.delta.kernel.data.Row
import io.delta.kernel.defaults.utils.TestUtils
import io.delta.kernel.engine.Engine
Expand All @@ -35,9 +33,11 @@ import io.delta.kernel.internal.checksum.{ChecksumReader, CRCInfo}
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.util.FileNames
import io.delta.kernel.internal.util.Utils.toCloseableIterator
import io.delta.kernel.types.IntegerType.INTEGER
import io.delta.kernel.types.LongType.LONG
import io.delta.kernel.types.StructType
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable};
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable}

import org.apache.spark.sql.functions.col

/**
* Test suite to verify checksum file correctness by comparing
Expand All @@ -58,16 +58,18 @@ class ChecksumSimpleComparisonSuite extends DeltaTableWriteSuiteBase with TestUt
engine,
kernelTablePath,
isNewTable = true,
schema = new StructType().add("id", INTEGER),
schema = new StructType().add("id", LONG),
partCols = Seq.empty).commit(engine, emptyIterable())
.getPostCommitHooks
.forEach(hook => hook.threadSafeInvoke(engine))
spark.sql(s"CREATE OR REPLACE TABLE delta.`${sparkTablePath}` (id Integer) USING DELTA")
spark.sql(s"CREATE OR REPLACE TABLE delta.`${sparkTablePath}` (id LONG) USING DELTA")
assertChecksumEquals(engine, sparkTablePath, kernelTablePath, 0)

(1 to 10).foreach(version =>
insertIntoUnpartitionedTableAndCheckCrc(engine, sparkTablePath, kernelTablePath, version))

(1 to 10).foreach { version =>
spark.range(0, version).write.format("delta").mode("append").save(sparkTablePath)
commitSparkChangeToKernel(kernelTablePath, engine, sparkTablePath, version)
assertChecksumEquals(engine, sparkTablePath, kernelTablePath, version)
}
}
}

Expand All @@ -80,77 +82,24 @@ class ChecksumSimpleComparisonSuite extends DeltaTableWriteSuiteBase with TestUt
engine,
kernelTablePath,
isNewTable = true,
schema = new StructType().add("id", INTEGER).add(PARTITION_COLUMN, INTEGER),
schema = new StructType().add("id", LONG).add(PARTITION_COLUMN, LONG),
partCols = Seq(PARTITION_COLUMN)).commit(engine, emptyIterable())
.getPostCommitHooks
.forEach(hook => hook.threadSafeInvoke(engine))
spark.sql(
s"CREATE OR REPLACE TABLE delta.`${sparkTablePath}` " +
s"(id Integer, part Integer) USING DELTA PARTITIONED BY (part)")
s"(id LONG, part LONG) USING DELTA PARTITIONED BY (part)")
assertChecksumEquals(engine, sparkTablePath, kernelTablePath, 0)

(1 to 10).foreach(version =>
insertIntoPartitionedTableAndCheckCrc(engine, sparkTablePath, kernelTablePath, version))
(1 to 10).foreach { version =>
spark.range(0, version).withColumn(PARTITION_COLUMN, col("id") % 2)
.write.format("delta").mode("append").save(sparkTablePath)
commitSparkChangeToKernel(kernelTablePath, engine, sparkTablePath, version)
assertChecksumEquals(engine, sparkTablePath, kernelTablePath, version)
}
}
}

/**
* Insert into unpartitioned spark table, load the added files from the spark table's commit log,
* commit them to kernel table and verify the checksum files are consistent between spark
* and kernel.
*/
private def insertIntoUnpartitionedTableAndCheckCrc(
engine: Engine,
sparkTablePath: String,
kernelTablePath: String,
versionAtCommit: Long): Unit = {
var valueToAppend = "(0)"
(0L to versionAtCommit).foreach(i => valueToAppend = valueToAppend + s",($i)")
spark.sql(
s"INSERT INTO delta.`$sparkTablePath` values $valueToAppend")

val txn = Table
.forPath(engine, kernelTablePath)
.createTransactionBuilder(engine, "test-engine", Operation.WRITE)
.build(engine)

commitSparkChangeToKernel(txn, engine, sparkTablePath, versionAtCommit)
assertChecksumEquals(engine, sparkTablePath, kernelTablePath, versionAtCommit)
}

/**
* Insert into partitioned spark table, load the added files from the spark table's commit log,
* commit them to kernel table and verify the checksum files are consistent between spark
* and kernel.
*/
private def insertIntoPartitionedTableAndCheckCrc(
engine: Engine,
sparkTablePath: String,
kernelTablePath: String,
versionAtCommit: Long): Unit = {
var valueToAppend = "(0, 0)"
var addedPartition = Set(0)
(0L to versionAtCommit).foreach(i => {
val partitionValue = 2 * i
addedPartition = addedPartition + partitionValue.toInt
valueToAppend = valueToAppend + s",($i, $partitionValue)"
})
spark.sql(
s"INSERT INTO delta.`$sparkTablePath` values $valueToAppend")

val txn = Table
.forPath(engine, kernelTablePath)
.createTransactionBuilder(engine, "test-engine", Operation.WRITE)
.build(engine)

commitSparkChangeToKernel(
txn,
engine,
sparkTablePath,
versionAtCommit)
assertChecksumEquals(engine, sparkTablePath, kernelTablePath, versionAtCommit)
}

private def assertChecksumEquals(
engine: Engine,
sparkTablePath: String,
Expand Down Expand Up @@ -188,16 +137,22 @@ class ChecksumSimpleComparisonSuite extends DeltaTableWriteSuiteBase with TestUt
assert(crc1.getMetadata.getPartitionColNames === crc2.getMetadata.getPartitionColNames)
}

// TODO docs
private def commitSparkChangeToKernel(
txn: Transaction,
path: String,
engine: Engine,
sparkTablePath: String,
versionToConvert: Long): Unit = {

val txn = Table.forPath(engine, path)
.createTransactionBuilder(engine, "test-engine", Operation.WRITE)
.build(engine)

val tableChange = Table.forPath(engine, sparkTablePath).asInstanceOf[TableImpl].getChanges(
engine,
versionToConvert,
versionToConvert,
// TODO include REMOVE action as well once we support it
Set(DeltaAction.ADD).asJava)

val addFilesRows = new util.ArrayList[Row]()
Expand Down

0 comments on commit aae9d64

Please sign in to comment.