Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional retry mechanism if data loading fails #1034

Merged
merged 8 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ object LocalFeatureJoinJob {
val ss: SparkSession = createSparkSession(enableHiveSupport = true)
val SKIP_MISSING_FEATURE = SQLConf
.buildConf("spark.feathr.skip.missing.feature")
.doc("Whether to use the V2 implementation, which should have better performance.")
.doc("Whether to skip features if data is missing.")
.booleanConf
.createWithDefault(false)

val MAX_DATA_LOAD_RETRY = SQLConf
.buildConf("spark.feathr.max.data.load.retry")
.doc("Number of retries if data is missing.")
.intConf
.createWithDefault(0)

/**
* local debug API, used in unit test and local debug
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrInputDataExceptio
import com.linkedin.feathr.offline.config.location.DataLocation
import com.linkedin.feathr.offline.generation.SparkIOUtils
import com.linkedin.feathr.offline.job.DataSourceUtils.getSchemaFromAvroDataFile
import com.linkedin.feathr.offline.job.LocalFeatureJoinJob
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption
import com.linkedin.feathr.offline.util.FeathrUtils
import com.linkedin.feathr.offline.util.FeathrUtils.DATA_LOAD_WAIT_IN_MS
import org.apache.avro.Schema
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
Expand All @@ -17,7 +21,10 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
* @param path input data path
*/
private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, dataLoaderHandlers: List[DataLoaderHandler]) extends DataLoader {
val retryWaitTime = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DATA_LOAD_WAIT_IN_MS).toInt

val initialNumOfRetries = if (!ss.sparkContext.isLocal) FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf,
FeathrUtils.MAX_DATA_LOAD_RETRY).toInt else SQLConf.get.getConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY)
/**
* get the schema of the source. It's only used in the deprecated DataSource.getDataSetAndSchema
* @return an Avro Schema
Expand Down Expand Up @@ -48,7 +55,9 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
* @return an dataframe
*/
override def loadDataFrame(): DataFrame = {
loadDataFrame(Map(), new JobConf(ss.sparkContext.hadoopConfiguration))
val retry = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.MAX_DATA_LOAD_RETRY).toInt
val retryCount = if (ss.sparkContext.isLocal) SQLConf.get.getConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY) else retry
loadDataFrameWithRetry(Map(), new JobConf(ss.sparkContext.hadoopConfiguration), retryCount)
}

/**
Expand All @@ -57,7 +66,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
* @param jobConf Hadoop JobConf to be passed
* @return an dataframe
*/
def loadDataFrame(dataIOParameters: Map[String, String], jobConf: JobConf): DataFrame = {
def loadDataFrameWithRetry(dataIOParameters: Map[String, String], jobConf: JobConf, retry: Int): DataFrame = {
val sparkConf = ss.sparkContext.getConf
val inputSplitSize = sparkConf.get("spark.feathr.input.split.size", "")
val dataIOParametersWithSplitSize = Map(SparkIOUtils.SPLIT_SIZE -> inputSplitSize) ++ dataIOParameters
Expand Down Expand Up @@ -87,12 +96,17 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
}
df
} catch {
case feathrException: FeathrInputDataException =>
println(feathrException.toString)
throw feathrException // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed.
case e: Throwable => //TODO: Analyze all thrown exceptions, instead of swalling them all, and reading as a csv
println(e.toString)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
case _: Throwable =>
// If data loading from source failed, retry it automatically, as it might due to data source still being written into.
log.info(s"Loading ${location} failed, retrying for ${retry}-th time..")
if (retry > 0) {
Thread.sleep(retryWaitTime)
loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1)
} else {
// Throwing exception to avoid dataLoaderHandler hook exception from being swallowed.
throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" +
s" and retry time of ${retryWaitTime}ms.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ private[offline] object FeathrUtils {
val SPARK_JOIN_MAX_PARALLELISM = "max.parallelism"
val CHECKPOINT_OUTPUT_PATH = "checkpoint.dir"
val SPARK_JOIN_MIN_PARALLELISM = "min.parallelism"
val MAX_DATA_LOAD_RETRY = "max.data.load.retry"
val DATA_LOAD_WAIT_IN_MS = "data.load.wait.in.ms"

val defaultParams: Map[String, String] = Map(
ENABLE_DEBUG_OUTPUT -> "false",
Expand All @@ -50,6 +52,8 @@ private[offline] object FeathrUtils {
SEQ_JOIN_ARRAY_EXPLODE_ENABLED -> "true",
ENABLE_SALTED_JOIN -> "false",
SKIP_MISSING_FEATURE -> "false",
MAX_DATA_LOAD_RETRY-> "0",
DATA_LOAD_WAIT_IN_MS-> "1",
// If one key appears more than 0.02% in the dataset, we will salt this join key and split them into multiple partitions
// This is an empirical value
SALTED_JOIN_FREQ_ITEM_THRESHOLD -> "0.0002",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.feathr.offline.source.dataloader

import com.linkedin.feathr.common.exception.{FeathrConfigException, FeathrInputDataException}
import com.linkedin.feathr.offline.TestFeathr
import com.linkedin.feathr.offline.config.location.SimplePath
import com.linkedin.feathr.offline.job.LocalFeatureJoinJob
import org.apache.spark.sql.Row
import org.apache.spark.sql.internal.SQLConf
import org.testng.Assert.assertEquals
import org.testng.annotations.Test

Expand Down Expand Up @@ -34,6 +37,20 @@ class TestBatchDataLoader extends TestFeathr {
assertEquals(df.collect(), expectedRows)
}

/**
* Test the batch loader retries before failing.
*/
@Test(expectedExceptions = Array(classOf[FeathrInputDataException]),
expectedExceptionsMessageRegExp = ".* after 3 retries and retry time of 1ms.*")
def testRetry(): Unit = {
SQLConf.get.setConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY, 3)
val path = "anchor11-source.csv"
val batchDataLoader = new BatchDataLoader(ss, location = SimplePath(path), List())
val df = batchDataLoader.loadDataFrame()
df.show()
SQLConf.get.setConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY, 0)
}

@Test(description = "test loading dataframe with BatchDataLoader by specifying delimiter")
def testBatchDataLoaderWithCsvDelimiterOption() : Unit = {
val path = "anchor1-source.tsv"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=0.10.4-rc3
version=0.10.4-rc4
SONATYPE_AUTOMATIC_RELEASE=true
POM_ARTIFACT_ID=feathr_2.12