Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip Anchored and derived features if the feature data is unavailable #1026

Merged
merged 9 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package com.linkedin.feathr.offline.client

import com.linkedin.feathr.common.exception._
import com.linkedin.feathr.common.{FeatureInfo, Header, InternalApi, JoiningFeatureParams, RichConfig, TaggedFeatureName}
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater
import com.linkedin.feathr.offline.config.{FeathrConfig, FeathrConfigLoader, FeatureGroupsGenerator, FeatureJoinConfig}
import com.linkedin.feathr.offline.generation.{DataFrameFeatureGenerator, FeatureGenKeyTagAnalyzer, StreamingFeatureGenerator}
import com.linkedin.feathr.offline.job._
import com.linkedin.feathr.offline.join.DataFrameFeatureJoiner
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlanner}
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner}
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
import com.linkedin.feathr.offline.source.DataSource
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
Expand Down Expand Up @@ -235,17 +236,29 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
*/
val updatedFeatureGroups = featureGroupsUpdater.updateFeatureGroups(featureGroups, keyTaggedFeatures)

val logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures)

var logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures)
val shouldSkipFeature = FeathrUtils.getFeathrJobParam(sparkSession.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean
val featureToPathsMap = (for {
requiredFeature <- logicalPlan.allRequiredFeatures
featureAnchorWithSource <- allAnchoredFeatures.get(requiredFeature.getFeatureName)
} yield (requiredFeature.getFeatureName -> featureAnchorWithSource.source.path)).toMap
if (!sparkSession.sparkContext.isLocal) {
// Check read authorization for all required features
AclCheckUtils.checkReadAuthorization(sparkSession, logicalPlan.allRequiredFeatures, allAnchoredFeatures) match {
case Failure(exception) =>
throw new FeathrInputDataException(
ErrorLabel.FEATHR_USER_ERROR,
"Unable to verify " +
"read authorization on feature data, it can be due to the following reasons: 1) input not exist, 2) no permission.",
exception)
val featurePathsTest = AclCheckUtils.checkReadAuthorization(sparkSession, logicalPlan.allRequiredFeatures, allAnchoredFeatures)
featurePathsTest._1 match {
case Failure(exception) => // If skip feature, remove the corresponding anchored feature from the feature group and produce a new logical plan
if (shouldSkipFeature || (sparkSession.sparkContext.isLocal &&
SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) {
val featureGroupsWithoutInvalidFeatures = FeatureGroupsUpdater()
.getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap, updatedFeatureGroups, featurePathsTest._2)
logicalPlanner.getLogicalPlan(featureGroupsWithoutInvalidFeatures, keyTaggedFeatures)
} else {
throw new FeathrInputDataException(
ErrorLabel.FEATHR_USER_ERROR,
"Unable to verify " +
"read authorization on feature data, it can be due to the following reasons: 1) input not exist, 2) no permission.",
exception)
}
case Success(_) => log.debug("Checked read authorization on all feature data")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.feathr.offline.config.sources

import com.linkedin.feathr.common.JoiningFeatureParams
import com.linkedin.feathr.offline.{FeatureDataFrame, JoinKeys}
import com.linkedin.feathr.offline.anchored.anchorExtractor.TimeWindowConfigurableAnchorExtractor
import com.linkedin.feathr.offline.logical.FeatureGroups
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
import com.linkedin.feathr.offline.job.FeatureJoinJob.{FeatureName, log}
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan}

/**
* Feature groups will be generated using only the feature def config using the [[com.linkedin.feathr.offline.config.FeatureGroupsGenerator]]
Expand Down Expand Up @@ -70,6 +73,82 @@ private[offline] class FeatureGroupsUpdater {
featureGroups.allSeqJoinFeatures)
}

/**
* Update the feature groups (for Feature gen) based on feature missing features. Few anchored features can be missing if the feature data
* is not present. Remove those anchored features, and also the corresponding derived feature which are dependent on it.
* @param featureGroups
* @param allStageFeatures
* @param keyTaggedFeatures
* @return
*/
def getUpdatedFeatureGroups(featureGroups: FeatureGroups, allStageFeatures: Map[FeatureName, (FeatureDataFrame, JoinKeys)],
keyTaggedFeatures: Seq[JoiningFeatureParams]): (FeatureGroups, Seq[JoiningFeatureParams]) = {
val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow => allStageFeatures.contains(featureRow._1))
// Iterate over the derived features and remove the derived features which contains these anchored features.
val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => {
// Find the constituent anchored features for every derived feature
val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName)
val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature))
!containsFeature.contains(false)
})
val updatedSeqJoinFeature = featureGroups.allSeqJoinFeatures.filter(seqJoinFeature => {
// Find the constituent anchored features for every derived feature
val allAnchoredFeaturesInDerived = seqJoinFeature._2.consumedFeatureNames.map(_.getFeatureName)
val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature))
!containsFeature.contains(false)
})
val updatedWindowAggFeatures = featureGroups.allWindowAggFeatures.filter(windowAggFeature => updatedAnchoredFeatures.contains(windowAggFeature._1))

log.warn(s"Removed the following features:- ${featureGroups.allAnchoredFeatures.keySet.diff(updatedAnchoredFeatures.keySet)}," +
s"${featureGroups.allDerivedFeatures.keySet.diff(updatedDerivedFeatures.keySet)}," +
s" ${featureGroups.allSeqJoinFeatures.keySet.diff(updatedSeqJoinFeature.keySet)}")
val updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, updatedWindowAggFeatures,
featureGroups.allPassthroughFeatures, updatedSeqJoinFeature)
val updatedKeyTaggedFeatures = keyTaggedFeatures.filter(feature => updatedAnchoredFeatures.contains(feature.featureName)
|| updatedDerivedFeatures.contains(feature.featureName) || updatedWindowAggFeatures.contains(feature.featureName)
|| featureGroups.allPassthroughFeatures.contains(feature.featureName) || updatedSeqJoinFeature.contains(feature.featureName))
(updatedFeatureGroups, updatedKeyTaggedFeatures)
}

/**
* Exclude anchored and derived features features from the join stage if they do not have a valid path.
* @param featureToPathsMap Map of anchored feature names to their paths
* @param featureGroups All feature groups
* @param invalidPaths List of all invalid paths
* @return
*/
def getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap: Map[String, String], featureGroups: FeatureGroups, invalidPaths: Seq[String]): FeatureGroups = {
val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureNameToAnchoredObject => {
!invalidPaths.contains(featureToPathsMap(featureNameToAnchoredObject._1))
})

// Iterate over the derived features and remove the derived features which contains these anchored features.
val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => {
// Find the constituent anchored features for every derived feature
val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName)
// Check if any of the features does not have a valid path
val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived
.map(featureName => !invalidPaths.contains(featureToPathsMap(featureName)))
!containsFeature.contains(false)
})

// Iterate over the seq join features and remove the derived features which contains these anchored features.
val updatedSeqJoinFeatures = featureGroups.allSeqJoinFeatures.filter(seqJoinFeature => {
// Find the constituent anchored features for every derived feature
val allAnchoredFeaturesInDerived = seqJoinFeature._2.consumedFeatureNames.map(_.getFeatureName)
// Check if any of the features does not have a valid path
val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived
.map(featureName => !invalidPaths.contains(featureToPathsMap(featureName)))
!containsFeature.contains(false)
})

log.warn(s"Removed the following features:- ${featureGroups.allAnchoredFeatures.keySet.diff(updatedAnchoredFeatures.keySet)}," +
s"${featureGroups.allDerivedFeatures.keySet.diff(updatedDerivedFeatures.keySet)}," +
s" ${featureGroups.allSeqJoinFeatures.keySet.diff(updatedSeqJoinFeatures.keySet)}")
FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures,
featureGroups.allWindowAggFeatures, featureGroups.allPassthroughFeatures, updatedSeqJoinFeatures)
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,13 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession,
val ss = SparkSession.builder().getOrCreate()
val failOnMissingPartition = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.FAIL_ON_MISSING_PARTITION).toBoolean
val anchorDFMap1 = anchorToDataSourceMapper.getBasicAnchorDFMapForJoin(ss, Seq(featureAnchor), failOnMissingPartition)
val updatedAnchorDFMap = anchorDFMap1.filter(anchorEntry => anchorEntry._2.isDefined)
.map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get)
// We dont need to check if the anchored feature's dataframes are missing (due to skip missing feature) as such
// seq join features have already been removed in the FeatureGroupsUpdater#getUpdatedFeatureGroupsWithoutInvalidPaths.
val featureInfo = FeatureTransformation.directCalculate(
anchorGroup: AnchorFeatureGroups,
anchorDFMap1(featureAnchor),
updatedAnchorDFMap(featureAnchor),
featureAnchor.featureAnchor.sourceKeyExtractor,
None,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package com.linkedin.feathr.offline.generation
import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrException}
import com.linkedin.feathr.common.{Header, JoiningFeatureParams, TaggedFeatureName}
import com.linkedin.feathr.offline
import com.linkedin.feathr.offline.{FeatureDataFrame, JoinKeys}
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource.{getDefaultValues, getFeatureTypes}
import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater
import com.linkedin.feathr.offline.derived.functions.SeqJoinDerivationFunction
import com.linkedin.feathr.offline.derived.strategies.{DerivationStrategies, RowBasedDerivation, SequentialJoinDerivationStrategy, SparkUdfDerivation, SqlDerivationSpark}
import com.linkedin.feathr.offline.derived.{DerivedFeature, DerivedFeatureEvaluator}
import com.linkedin.feathr.offline.evaluator.DerivedFeatureGenStage
import com.linkedin.feathr.offline.job.FeatureJoinJob.FeatureName
import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation}
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan}
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner}
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
Expand All @@ -32,6 +35,7 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan

/**
* Generate anchored and derived features and return the feature DataFrame and feature metadata.
*
* @param ss input spark session.
* @param featureGenSpec specification for a feature generation job.
* @param featureGroups all features in scope grouped under different types.
Expand Down Expand Up @@ -63,6 +67,9 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan
val requiredRegularFeatureAnchorsWithTime = allRequiredFeatureAnchorWithSourceAndTime.values.toSeq
val anchorDFRDDMap = anchorToDataFrameMapper.getAnchorDFMapForGen(ss, requiredRegularFeatureAnchorsWithTime, Some(incrementalAggContext), failOnMissingPartition)

val updatedAnchorDFRDDMap = anchorDFRDDMap.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get)
if(updatedAnchorDFRDDMap.isEmpty) return Map()

// 3. Load user specified default values and feature types, if any.
val featureToDefaultValueMap = getDefaultValues(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq)
val featureToTypeConfigMap = getFeatureTypes(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq)
Expand All @@ -72,15 +79,21 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan
case (_: Seq[Int], featureNames: Seq[String]) =>
val (anchoredFeatureNamesThisStage, _) = featureNames.partition(featureGroups.allAnchoredFeatures.contains)
val anchoredFeaturesThisStage = featureNames.filter(featureGroups.allAnchoredFeatures.contains).map(allRequiredFeatureAnchorWithSourceAndTime).distinct
val anchoredDFThisStage = anchorDFRDDMap.filterKeys(anchoredFeaturesThisStage.toSet)
val anchoredDFThisStage = updatedAnchorDFRDDMap.filterKeys(anchoredFeaturesThisStage.toSet)

FeatureTransformation
.transformFeatures(anchoredDFThisStage, anchoredFeatureNamesThisStage, None, Some(incrementalAggContext), mvelContext)
.map(f => (f._1, (offline.FeatureDataFrame(f._2.transformedResult.df, f._2.transformedResult.inferredFeatureTypes), f._2.joinKey)))
}.toMap

// 5. Group features based on grouping specified in output processors
val groupedAnchoredFeatures = featureGenFeatureGrouper.group(allStageFeatures, featureGenSpec.getOutputProcessorConfigs, featureGroups.allDerivedFeatures)
val updatedAllStageFeatures = allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty)
val (updatedFeatureGroups, updatedKeyTaggedFeatures) = FeatureGroupsUpdater().getUpdatedFeatureGroups(featureGroups,
updatedAllStageFeatures, keyTaggedFeatures)

val updatedLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(updatedFeatureGroups, updatedKeyTaggedFeatures)
val groupedAnchoredFeatures = featureGenFeatureGrouper.group(updatedAllStageFeatures, featureGenSpec.getOutputProcessorConfigs,
updatedFeatureGroups.allDerivedFeatures)

// 6. Substitute defaults at this stage since all anchored features are generated and grouped together.
// Substitute before generating derived features.
Expand All @@ -89,9 +102,9 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan

// 7. Calculate derived features.
val derivedFeatureEvaluator = getDerivedFeatureEvaluatorInstance(ss, featureGroups)
val derivedFeatureGenerator = DerivedFeatureGenStage(featureGroups, logicalPlan, derivedFeatureEvaluator)
val derivedFeatureGenerator = DerivedFeatureGenStage(updatedFeatureGroups, updatedLogicalPlan, derivedFeatureEvaluator)

val derivationsEvaluatedFeatures = (logicalPlan.joinStages ++ logicalPlan.convertErasedEntityTaggedToJoinStage(logicalPlan.postJoinDerivedFeatures))
val derivationsEvaluatedFeatures = (updatedLogicalPlan.joinStages ++ updatedLogicalPlan.convertErasedEntityTaggedToJoinStage(logicalPlan.postJoinDerivedFeatures))
.foldLeft(defaultSubstitutedFeatures)((accFeatureData, currentStage) => {
val (keyTags, featureNames) = currentStage
val derivedFeatureNamesThisStage = featureNames.filter(featureGroups.allDerivedFeatures.contains)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ class StreamingFeatureGenerator(dataPathHandlers: List[DataPathHandler]) {
}
// Load the raw streaming source data
val anchorDfRDDMap = anchorToDataFrameMapper.getAnchorDFMapForGen(ss, anchors, None, false, true)
anchorDfRDDMap.par.map { case (anchor, dfAccessor) => {

// Remove entries for which feature dataframe cannot be loaded.
val updatedAnchorDFRDDMap = anchorDfRDDMap.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get)

updatedAnchorDFRDDMap.par.map { case (anchor, dfAccessor) => {
val schemaStr = anchor.source.location.asInstanceOf[KafkaEndpoint].schema.avroJson
val schemaStruct = SchemaConverters.toSqlType(Schema.parse(schemaStr)).dataType.asInstanceOf[StructType]
val rowForRecord = (input: Any) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,16 @@ object FeatureGenJob {
sparkSession,
allAnchoredFeatures.values.toSeq,
failOnMissing)
val updatedAnchorsWithSource = anchorsWithSource.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get)
if (updatedAnchorsWithSource.isEmpty) return featureNamesInAnchorSet.asScala.map(featureName => featureName -> sparkSession.emptyDataFrame).toMap.asJava

// Only load DataFrames for anchors that have preprocessing UDF
// So we filter out anchors that doesn't have preprocessing UDFs
// We use feature names sorted and merged as the key to find the anchor
// For example, f1, f2 belongs to anchor. Then Map("f1,f2"-> anchor)
val dataFrameMapForPreprocessing = anchorsWithSource
updatedAnchorsWithSource
.filter(x => featureNamesInAnchorSet.contains(x._1.featureAnchor.features.toSeq.sorted.mkString(",")))
.map(x => (x._1.featureAnchor.features.toSeq.sorted.mkString(","), x._2.get()))

// Pyspark only understand Java map so we need to convert Scala map back to Java map.
dataFrameMapForPreprocessing.asJava
.map(x => (x._1.featureAnchor.features.toSeq.sorted.mkString(","), x._2.get())).asJava
}

def prepareSparkSession(args: Array[String]): FeathrGenPreparationInfo = {
Expand Down
Loading