From 7b773fc2d66d32b6a4d13ca8adaf231dea186dcd Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Sun, 24 Nov 2024 22:13:32 +0530 Subject: [PATCH] [HUDI-8570] Use secondary index only for snapshot queries (#12322) Use secondary index only for snapshot queries. Skip secondary index and fallback to regular query path for query types such as time travel and incremental. --- .../command/index/TestSecondaryIndex.scala | 142 ++++++++++++++---- .../hudi/common/HoodieSparkSqlTestBase.scala | 14 ++ 2 files changed, 128 insertions(+), 28 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala index 99c1a53a00981..4ecceee9fdbd1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala @@ -20,18 +20,20 @@ package org.apache.spark.sql.hudi.command.index import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, INSERT_OPERATION_OPT_VAL, OPERATION, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE, UPSERT_OPERATION_OPT_VAL} -import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils} import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.WriteOperationType import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils} import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieWriteConfig} import org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR import org.apache.hudi.metadata.SecondaryIndexKeyUtils +import org.apache.hudi.storage.StoragePath import org.apache.spark.sql.SaveMode import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase -import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ @@ -224,18 +226,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase { val basePath = s"${tmp.getCanonicalPath}/$tableName" // Step 1: Initial Insertion of Records val dataGen = new HoodieTestDataGenerator() - val initialRecords = recordsToStrings(dataGen.generateInserts(getInstantTime, 50, true)).asScala - val initialDf = spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2)) - val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ", HoodieWriteConfig.TBL_NAME.key -> tableName) - initialDf.write.format("hudi") - .options(hudiOpts) - .option(OPERATION.key, INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) - - // Step 2: Create table and secondary index on 'rider' column - spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'") - spark.sql(s"create index idx_rider on $tableName using secondary_index(rider)") + val hudiOpts: Map[String, String] = loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen) // Verify initial state of secondary index val initialKeys = spark.sql(s"select _row_key from $tableName limit 5").collect().map(_.getString(0)) @@ -287,7 +278,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase { validateSecondaryIndex(basePath, tableName, updateKeys) // Step 6: Perform Deletes on Records and Validate Secondary Index - val deleteKeys = initialKeys.take(3) // pick a subset of keys to delete + val deleteKeys = initialKeys.take(1) // pick a subset of keys to delete val deleteDf = spark.read.format("hudi").load(basePath).filter(s"_row_key in ('${deleteKeys.mkString("','")}')") deleteDf.write.format("hudi") .options(hudiOpts) @@ -329,18 +320,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase { val basePath = s"${tmp.getCanonicalPath}/$tableName" // Step 1: Initial Insertion of Records val dataGen = new HoodieTestDataGenerator() - val initialRecords = recordsToStrings(dataGen.generateInserts(getInstantTime, 50, true)).asScala - val initialDf = spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2)) - val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ", HoodieWriteConfig.TBL_NAME.key -> tableName) - initialDf.write.format("hudi") - .options(hudiOpts) - .option(OPERATION.key, INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) - - // Step 2: Create table and secondary index on 'rider' column - spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'") - spark.sql(s"create index idx_rider on $tableName using secondary_index(rider)") + val hudiOpts: Map[String, String] = loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen) // Verify initial state of secondary index val initialKeys = spark.sql(s"select _row_key from $tableName limit 5").collect().map(_.getString(0)) @@ -354,7 +334,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase { .options(hudiOpts) .option(OPERATION.key, operationType) .mode(SaveMode.Append) - .save(basePath)) ( + .save(basePath))( "Can not perform operation " + WriteOperationType.fromValue(operationType) + " on secondary index") // disable secondary index and retry df.write.format("hudi") @@ -363,11 +343,117 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase { .option(OPERATION.key, operationType) .mode(SaveMode.Append) .save(basePath) + dataGen.close() } } } } + test("Test Secondary Index With Time Travel Query") { + if (HoodieSparkUtils.gteqSpark3_3) { + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + // Step 1: Initial Insertion of Records + val dataGen = new HoodieTestDataGenerator() + val numInserts = 5 + val hudiOpts: Map[String, String] = loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen, numInserts) + + // Verify initial state of secondary index + val initialKeys = spark.sql(s"select _row_key from $tableName limit 5").collect().map(_.getString(0)) + validateSecondaryIndex(basePath, tableName, initialKeys) + + // Step 3: Perform Update Operations on Subset of Records + val updateRecords = recordsToStrings(dataGen.generateUniqueUpdates(getInstantTime, 1, HoodieTestDataGenerator.TRIP_FLATTENED_SCHEMA)).asScala + val updateDf = spark.read.json(spark.sparkContext.parallelize(updateRecords.toSeq, 1)) + val updateKeys = updateDf.select("_row_key").collect().map(_.getString(0)) + val recordKeyToUpdate = updateKeys.head + val initialSecondaryKey = spark.sql( + s"SELECT key FROM hudi_metadata('$basePath') WHERE type=7 AND key LIKE '%$SECONDARY_INDEX_RECORD_KEY_SEPARATOR$recordKeyToUpdate'" + ).collect().map(indexKey => SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(indexKey.getString(0))).head + // update the record + updateDf.write.format("hudi") + .options(hudiOpts) + .option(OPERATION.key, UPSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + // Verify secondary index after updates + validateSecondaryIndex(basePath, tableName, updateKeys) + + // Step 4: Perform Time Travel Query + // get the first instant on the timeline + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(HoodieTestUtils.getDefaultStorageConf) + .build() + val firstInstant = metaClient.reloadActiveTimeline().filterCompletedInstants().firstInstant().get() + // do a time travel query with data skipping enabled + val readOpts = hudiOpts ++ Map( + HoodieMetadataConfig.ENABLE.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true" + ) + val timeTravelDF = spark.read.format("hudi") + .options(readOpts) + .option("as.of.instant", firstInstant.requestedTime) + .load(basePath) + assertEquals(numInserts, timeTravelDF.count()) + // updated record should still show in time travel view + assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKeyToUpdate'").count()) + // rider field (secondary key) should point to previous value + val secondaryKey = timeTravelDF.where(s"_row_key = '$recordKeyToUpdate'").select("rider").collect().head.getString(0) + assertEquals(initialSecondaryKey, secondaryKey) + + // Perform Deletes on Records and Validate Secondary Index + val deleteDf = spark.read.format("hudi").load(basePath).filter(s"_row_key in ('${updateKeys.mkString("','")}')") + // Get fileId for the delete record + val deleteFileId = deleteDf.select("_hoodie_file_name").collect().head.getString(0) + deleteDf.write.format("hudi") + .options(hudiOpts) + .option(OPERATION.key, DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + // Verify secondary index for deletes + validateSecondaryIndex(basePath, tableName, updateKeys, hasDeleteKeys = true) + // Corrupt the data file that was written for the delete key in the first instant + val firstCommitMetadata = deserializeCommitMetadata(metaClient.reloadActiveTimeline().getInstantDetails(firstInstant).get()) + val partitionToWriteStats = firstCommitMetadata.getPartitionToWriteStats.asScala.mapValues(_.asScala.toList) + // Find the path for the given fileId + val matchingPath: Option[String] = partitionToWriteStats.values.flatten + .find(_.getFileId == deleteFileId) + .map(_.getPath) + assertTrue(matchingPath.isDefined) + // Corrupt the data file + val dataFile = new StoragePath(basePath, matchingPath.get) + val storage = metaClient.getStorage + storage.deleteFile(dataFile) + storage.createNewFile(dataFile) + // Time travel query should now throw an exception + checkExceptionContain(() => spark.read.format("hudi") + .options(readOpts) + .option("as.of.instant", firstInstant.requestedTime) + .load(basePath).count())(s"${dataFile.toString} is not a Parquet file") + + dataGen.close() + } + } + } + + private def loadInitialBatchAndCreateSecondaryIndex(tableName: String, basePath: String, dataGen: HoodieTestDataGenerator, numInserts: Integer = 50) = { + val initialRecords = recordsToStrings(dataGen.generateInserts(getInstantTime, numInserts, true)).asScala + val initialDf = spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2)) + val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ", HoodieWriteConfig.TBL_NAME.key -> tableName) + initialDf.write.format("hudi") + .options(hudiOpts) + .option(OPERATION.key, INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + // Step 2: Create table and secondary index on 'rider' column + spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'") + spark.sql(s"create index idx_rider on $tableName using secondary_index(rider)") + hudiOpts + } + private def validateSecondaryIndex(basePath: String, tableName: String, recordKeys: Array[String], hasDeleteKeys: Boolean = false): Unit = { // Check secondary index metadata for the selected keys recordKeys.foreach { key => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index b0b89a1718895..1f4d3e42a8d60 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -166,6 +166,20 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { assertResult(true)(hasException) } + protected def checkExceptionContain(runnable: Runnable)(errorMsg: String): Unit = { + var hasException = false + try { + runnable.run() + } catch { + case e: Throwable if checkMessageContains(e, errorMsg) || checkMessageContains(getRootCause(e), errorMsg) => + hasException = true + + case f: Throwable => + fail("Exception should contain: " + errorMsg + ", error message: " + f.getMessage, f) + } + assertResult(true)(hasException) + } + protected def checkExceptionContain(sql: String)(errorMsg: String): Unit = { var hasException = false try {