Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip anchored and derived features #1052

Merged
merged 5 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,49 @@ private[offline] class FeatureGroupsUpdater {
(updatedFeatureGroups, updatedKeyTaggedFeatures)
}

/**
* Update the feature groups (for Feature join) based on feature missing features. Few anchored features can be missing if the feature data
* is not present. Remove those anchored features, and also the corresponding derived feature which are dependent on it.
*
* @param featureGroups
* @param allStageFeatures
* @param keyTaggedFeatures
* @return
*/
def removeMissingFeatures(featureGroups: FeatureGroups, allAnchoredFeaturesWithData: Seq[String],
keyTaggedFeatures: Seq[JoiningFeatureParams]): (FeatureGroups, Seq[JoiningFeatureParams]) = {
rakeshkashyap123 marked this conversation as resolved.
Show resolved Hide resolved

// We need to add the window agg features to it as they are also considered anchored features.
val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow =>
allAnchoredFeaturesWithData.contains(featureRow._1)) ++ featureGroups.allWindowAggFeatures ++ featureGroups.allPassthroughFeatures

val updatedSeqJoinFeature = featureGroups.allSeqJoinFeatures.filter(seqJoinFeature => {
// Find the constituent anchored features for every derived feature
val allAnchoredFeaturesInDerived = seqJoinFeature._2.consumedFeatureNames.map(_.getFeatureName)
val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature))
!containsFeature.contains(false)
})

// Iterate over the derived features and remove the derived features which contains these anchored features.
val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => {
// Find the constituent anchored features for every derived feature
val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName)
val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature)
|| featureGroups.allDerivedFeatures.contains(feature))
!containsFeature.contains(false)
}) ++ updatedSeqJoinFeature

log.warn(s"Removed the following features:- ${featureGroups.allAnchoredFeatures.keySet.diff(updatedAnchoredFeatures.keySet)}," +
s"${featureGroups.allDerivedFeatures.keySet.diff(updatedDerivedFeatures.keySet)}," +
s" ${featureGroups.allSeqJoinFeatures.keySet.diff(updatedSeqJoinFeature.keySet)}")
val updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, featureGroups.allWindowAggFeatures,
featureGroups.allPassthroughFeatures, updatedSeqJoinFeature)
val updatedKeyTaggedFeatures = keyTaggedFeatures.filter(feature => updatedAnchoredFeatures.contains(feature.featureName)
|| updatedDerivedFeatures.contains(feature.featureName) || featureGroups.allWindowAggFeatures.contains(feature.featureName)
|| featureGroups.allPassthroughFeatures.contains(feature.featureName) || updatedSeqJoinFeature.contains(feature.featureName))
(updatedFeatureGroups, updatedKeyTaggedFeatures)
}

/**
* Exclude anchored and derived features features from the join stage if they do not have a valid path.
* @param featureToPathsMap Map of anchored feature names to their paths
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import com.linkedin.feathr.offline
import com.linkedin.feathr.offline.client.DataFrameColName
import com.linkedin.feathr.offline.client.DataFrameColName.getFeatureAlias
import com.linkedin.feathr.offline.config.FeatureJoinConfig
import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater
import com.linkedin.feathr.offline.derived.DerivedFeatureEvaluator
import com.linkedin.feathr.offline.job.FeatureTransformation.transformSingleAnchorDF
import com.linkedin.feathr.offline.job.{FeatureTransformation, TransformedResult}
import com.linkedin.feathr.offline.job.{FeatureTransformation, LocalFeatureJoinJob, TransformedResult}
import com.linkedin.feathr.offline.join.algorithms._
import com.linkedin.feathr.offline.join.util.{FrequentItemEstimatorFactory, FrequentItemEstimatorType}
import com.linkedin.feathr.offline.join.workflow._
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan}
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner}
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.offline.swa.SlidingWindowAggregationJoiner
Expand All @@ -22,6 +23,7 @@ import com.linkedin.feathr.offline.util.FeathrUtils
import com.linkedin.feathr.offline.util.datetime.DateTimeInterval
import com.linkedin.feathr.offline.{ErasedEntityTaggedFeature, FeatureDataFrame}
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.util.sketch.BloomFilter

Expand Down Expand Up @@ -189,12 +191,21 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d
.toIndexedSeq
.map(featureGroups.allAnchoredFeatures),
failOnMissingPartition)

val shouldSkipFeature = (FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean) ||
(ss.sparkContext.isLocal && SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))
val updatedSourceAccessorMap = anchorSourceAccessorMap.filter(anchorEntry => anchorEntry._2.isDefined)
.map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get)

val (updatedFeatureGroups, updatedLogicalPlan) = if (shouldSkipFeature) {
val (newFeatureGroups, newKeyTaggedFeatures) = FeatureGroupsUpdater().removeMissingFeatures(featureGroups,
updatedSourceAccessorMap.keySet.flatMap(featureAnchorWithSource => featureAnchorWithSource.featureAnchor.features).toSeq, keyTaggedFeatures)

val newLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(newFeatureGroups, newKeyTaggedFeatures)
(newFeatureGroups, newLogicalPlan)
} else (featureGroups, logicalPlan)

implicit val joinExecutionContext: JoinExecutionContext =
JoinExecutionContext(ss, logicalPlan, featureGroups, bloomFilters, Some(saltedJoinFrequentItemDFs))
JoinExecutionContext(ss, updatedLogicalPlan, updatedFeatureGroups, bloomFilters, Some(saltedJoinFrequentItemDFs))
// 3. Join sliding window aggregation features
val FeatureDataFrame(withWindowAggFeatureDF, inferredSWAFeatureTypes) =
joinSWAFeatures(ss, obsToJoinWithFeatures, joinConfig, featureGroups, failOnMissingPartition, bloomFilters, swaObsTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import com.linkedin.feathr.offline.config.location.{DataLocation, PathList, Simp
import com.linkedin.feathr.offline.generation.IncrementalAggContext
import com.linkedin.feathr.offline.job.LocalFeatureJoinJob
import com.linkedin.feathr.offline.source.DataSource
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.offline.source.accessor.{DataPathHandler, DataSourceAccessor, NonTimeBasedDataSourceAccessor}
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.source.pathutil.{PathChecker, TimeBasedHdfsPathAnalyzer}
import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils
Expand Down Expand Up @@ -70,12 +69,23 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH
}
}
val timeSeriesSource = try {
Some(DataSourceAccessor(ss = ss,
val dataSource = DataSourceAccessor(ss = ss,
source = source,
dateIntervalOpt = dateInterval,
expectDatumType = Some(expectDatumType),
failOnMissingPartition = failOnMissingPartition,
dataPathHandlers = dataPathHandlers))
dataPathHandlers = dataPathHandlers)

// If it is a nonTime based source, we will load the dataframe at runtime, however this is too late to decide if the
// feature should be skipped. So, we will try to take the first row here, and see if it succeeds.
if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature || (ss.sparkContext.isLocal &&
SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)))) {
if (dataSource.get().take(1).isEmpty) None else {
Some(dataSource)
}
} else {
Some(dataSource)
}
} catch {
case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal &&
SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) None else throw e
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
a_id
1
2
3
a_id,timestamp
1,2019-05-20
2,2019-05-19
3,2019-05-19
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import com.linkedin.feathr.common.configObj.configbuilder.ConfigBuilderException
import com.linkedin.feathr.common.exception.FeathrConfigException
import com.linkedin.feathr.offline.config.location.SimplePath
import com.linkedin.feathr.offline.generation.SparkIOUtils
import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager
import com.linkedin.feathr.offline.job.{LocalFeatureJoinJob, PreprocessedDataFrameManager}
import com.linkedin.feathr.offline.source.dataloader.{AvroJsonDataLoader, CsvDataLoader}
import com.linkedin.feathr.offline.util.FeathrTestUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.testng.Assert.assertTrue
import org.testng.annotations.{BeforeClass, Test}
Expand Down Expand Up @@ -278,6 +279,113 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
// FeathrTestUtils.assertDataFrameApproximatelyEquals(filteredDf, expectedDf, cmpFunc)
}

/*
* Test skipping combination of anchored, derived and swa features.
*/
@Test
def testSkipAnchoredFeatures: Unit = {
SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, true)
val df = runLocalFeatureJoinForTest(
joinConfigAsString =
"""
|settings: {
| joinTimeSettings: {
| timestampColumn: {
| def: "timestamp"
| format: "yyyy-MM-dd"
| }
| simulateTimeDelay: 1d
| }
|}
|
| features: {
| key: a_id
| featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "derived_featureWithNull2",
| "aEmbedding", "memberEmbeddingAutoTZ"]
| }
""".stripMargin,
featureDefAsString =
"""
| sources: {
| swaSource: {
| location: { path: "generaion/daily" }
| timePartitionPattern: "yyyy/MM/dd"
| timeWindowParameters: {
| timestampColumn: "timestamp"
| timestampColumnFormat: "yyyy-MM-dd"
| }
| }
| swaSource1: {
| location: { path: "generation/daily" }
| timePartitionPattern: "yyyy/MM/dd"
| timeWindowParameters: {
| timestampColumn: "timestamp"
| timestampColumnFormat: "yyyy-MM-dd"
| }
| }
|}
|
| anchors: {
| anchor1: {
| source: "anchorAndDerivations/nullVaueSource.avro.json"
| key: "toUpperCaseExt(mId)"
| features: {
| featureWithNull: "isPresent(value) ? toNumeric(value) : 0"
| }
| }
| anchor2: {
| source: "anchorAndDerivations/nullValueSource.avro.json"
| key: "toUpperCaseExt(mId)"
| features: {
| featureWithNull2: "isPresent(value) ? toNumeric(value) : 0"
| }
| }
| swaAnchor: {
| source: "swaSource"
| key: "x"
| features: {
| aEmbedding: {
| def: "embedding"
| aggregation: LATEST
| window: 3d
| }
| }
| }
| swaAnchor1: {
| source: "swaSource1"
| key: "x"
| features: {
| memberEmbeddingAutoTZ: {
| def: "embedding"
| aggregation: LATEST
| window: 3d
| type: {
| type: TENSOR
| tensorCategory: SPARSE
| dimensionType: [INT]
| valType: FLOAT
| }
| }
| }
| }
|}
|derivations: {
|
| derived_featureWithNull: "featureWithNull * 2"
| derived_featureWithNull2: "featureWithNull2 * 2"
|}
""".stripMargin,
observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv")

assertTrue(!df.data.columns.contains("featureWithNull"))
assertTrue(!df.data.columns.contains("derived_featureWithNull"))
assertTrue(df.data.columns.contains("derived_featureWithNull2"))
assertTrue(df.data.columns.contains("featureWithNull2"))
assertTrue(!df.data.columns.contains("aEmbedding"))
assertTrue(df.data.columns.contains("memberEmbeddingAutoTZ"))
SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, false)
}

/*
* Test features with null values.
*/
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=0.10.4-rc6
version=0.10.4-rc7
SONATYPE_AUTOMATIC_RELEASE=true
POM_ARTIFACT_ID=feathr_2.12