From 68e847955f41f0810d130484f89b309551104949 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Thu, 9 Feb 2023 21:35:55 -0800 Subject: [PATCH 1/5] Skip adding the Anchored and derived features in feature join --- .../config/sources/FeatureGroupsUpdater.scala | 41 +++++++++++++++ .../offline/join/DataFrameFeatureJoiner.scala | 19 +++++-- .../logical/MultiStageJoinPlanner.scala | 2 +- .../AnchorToDataSourceMapper.scala | 15 ++++-- .../offline/AnchoredFeaturesIntegTest.scala | 50 ++++++++++++++++++- 5 files changed, 117 insertions(+), 10 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala index c34d53fa4..3956511e5 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala @@ -110,6 +110,47 @@ private[offline] class FeatureGroupsUpdater { (updatedFeatureGroups, updatedKeyTaggedFeatures) } + /** + * Update the feature groups (for Feature gen) based on feature missing features. Few anchored features can be missing if the feature data + * is not present. Remove those anchored features, and also the corresponding derived feature which are dependent on it. + * + * @param featureGroups + * @param allStageFeatures + * @param keyTaggedFeatures + * @return + */ + def getUpdatedFeatureGroupsForJoin(featureGroups: FeatureGroups, allStageFeatures: Seq[String], + keyTaggedFeatures: Seq[JoiningFeatureParams]): (FeatureGroups, Seq[JoiningFeatureParams]) = { + val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow => + allStageFeatures.contains(featureRow._1)) ++ featureGroups.allWindowAggFeatures ++ featureGroups.allPassthroughFeatures + + val updatedSeqJoinFeature = featureGroups.allSeqJoinFeatures.filter(seqJoinFeature => { + // Find the constituent anchored features for every derived feature + val allAnchoredFeaturesInDerived = seqJoinFeature._2.consumedFeatureNames.map(_.getFeatureName) + val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature)) + !containsFeature.contains(false) + }) + + // Iterate over the derived features and remove the derived features which contains these anchored features. + val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => { + // Find the constituent anchored features for every derived feature + val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName) + val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature) + || featureGroups.allDerivedFeatures.contains(feature)) + !containsFeature.contains(false) + }) ++ updatedSeqJoinFeature + + log.warn(s"Removed the following features:- ${featureGroups.allAnchoredFeatures.keySet.diff(updatedAnchoredFeatures.keySet)}," + + s"${featureGroups.allDerivedFeatures.keySet.diff(updatedDerivedFeatures.keySet)}," + + s" ${featureGroups.allSeqJoinFeatures.keySet.diff(updatedSeqJoinFeature.keySet)}") + val updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, featureGroups.allWindowAggFeatures, + featureGroups.allPassthroughFeatures, updatedSeqJoinFeature) + val updatedKeyTaggedFeatures = keyTaggedFeatures.filter(feature => updatedAnchoredFeatures.contains(feature.featureName) + || updatedDerivedFeatures.contains(feature.featureName) || featureGroups.allWindowAggFeatures.contains(feature.featureName) + || featureGroups.allPassthroughFeatures.contains(feature.featureName) || updatedSeqJoinFeature.contains(feature.featureName)) + (updatedFeatureGroups, updatedKeyTaggedFeatures) + } + /** * Exclude anchored and derived features features from the join stage if they do not have a valid path. * @param featureToPathsMap Map of anchored feature names to their paths diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index 25c696655..af1aded7f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -5,13 +5,14 @@ import com.linkedin.feathr.offline import com.linkedin.feathr.offline.client.DataFrameColName import com.linkedin.feathr.offline.client.DataFrameColName.getFeatureAlias import com.linkedin.feathr.offline.config.FeatureJoinConfig +import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater import com.linkedin.feathr.offline.derived.DerivedFeatureEvaluator import com.linkedin.feathr.offline.job.FeatureTransformation.transformSingleAnchorDF -import com.linkedin.feathr.offline.job.{FeatureTransformation, TransformedResult} +import com.linkedin.feathr.offline.job.{FeatureTransformation, LocalFeatureJoinJob, TransformedResult} import com.linkedin.feathr.offline.join.algorithms._ import com.linkedin.feathr.offline.join.util.{FrequentItemEstimatorFactory, FrequentItemEstimatorType} import com.linkedin.feathr.offline.join.workflow._ -import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan} +import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner} import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataPathHandler import com.linkedin.feathr.offline.swa.SlidingWindowAggregationJoiner @@ -22,6 +23,7 @@ import com.linkedin.feathr.offline.util.FeathrUtils import com.linkedin.feathr.offline.util.datetime.DateTimeInterval import com.linkedin.feathr.offline.{ErasedEntityTaggedFeature, FeatureDataFrame} import org.apache.logging.log4j.LogManager +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.util.sketch.BloomFilter @@ -189,12 +191,21 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d .toIndexedSeq .map(featureGroups.allAnchoredFeatures), failOnMissingPartition) - + val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean val updatedSourceAccessorMap = anchorSourceAccessorMap.filter(anchorEntry => anchorEntry._2.isDefined) .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) + val (updatedFeatureGroups, updatedKeyTaggedFeatures, updatedLogicalPlan) = if (shouldSkipFeature || (ss.sparkContext.isLocal && + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { + val (newFeatureGroups, newKeyTaggedFeatures) = FeatureGroupsUpdater().getUpdatedFeatureGroupsForJoin(featureGroups, + updatedSourceAccessorMap.keySet.flatMap(featureAnchorWithSource => featureAnchorWithSource.featureAnchor.features).toSeq, keyTaggedFeatures) + + val newLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(newFeatureGroups, newKeyTaggedFeatures) + (newFeatureGroups, newKeyTaggedFeatures, newLogicalPlan) + } else (featureGroups, keyTaggedFeatures, logicalPlan) + implicit val joinExecutionContext: JoinExecutionContext = - JoinExecutionContext(ss, logicalPlan, featureGroups, bloomFilters, Some(saltedJoinFrequentItemDFs)) + JoinExecutionContext(ss, updatedLogicalPlan, updatedFeatureGroups, bloomFilters, Some(saltedJoinFrequentItemDFs)) // 3. Join sliding window aggregation features val FeatureDataFrame(withWindowAggFeatureDF, inferredSWAFeatureTypes) = joinSWAFeatures(ss, obsToJoinWithFeatures, joinConfig, featureGroups, failOnMissingPartition, bloomFilters, swaObsTime) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/logical/MultiStageJoinPlanner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/logical/MultiStageJoinPlanner.scala index ac8d2e86e..7f971830d 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/logical/MultiStageJoinPlanner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/logical/MultiStageJoinPlanner.scala @@ -38,7 +38,7 @@ private[offline] class MultiStageJoinPlanner extends LogicalPlanner[MultiStageJo log.info(s"allRequestedFeatures: $allRequestedFeatures, keyTagIntsToStrings: $keyTagIntsToStrings") // Resolve feature dependencies - val allRequiredFeatures = getDependencyOrdering(featureGroups.allAnchoredFeatures, featureGroups.allDerivedFeatures, allRequestedFeatures) + val allRequiredFeatures = getDependencyOrdering(featureGroups.allAnchoredFeatures, featureGroups.allDerivedFeatures, allRequestedFeatures) log.info(s"allRequiredFeatures: $allRequiredFeatures") // Plan the join stages required to resolve all these features diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index d99a3535c..3ab9cf771 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -10,8 +10,7 @@ import com.linkedin.feathr.offline.config.location.{DataLocation, PathList, Simp import com.linkedin.feathr.offline.generation.IncrementalAggContext import com.linkedin.feathr.offline.job.LocalFeatureJoinJob import com.linkedin.feathr.offline.source.DataSource -import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor -import com.linkedin.feathr.offline.source.accessor.DataPathHandler +import com.linkedin.feathr.offline.source.accessor.{DataPathHandler, DataSourceAccessor, NonTimeBasedDataSourceAccessor} import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler import com.linkedin.feathr.offline.source.pathutil.{PathChecker, TimeBasedHdfsPathAnalyzer} import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils @@ -70,12 +69,20 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH } } val timeSeriesSource = try { - Some(DataSourceAccessor(ss = ss, + val dataSource = DataSourceAccessor(ss = ss, source = source, dateIntervalOpt = dateInterval, expectDatumType = Some(expectDatumType), failOnMissingPartition = failOnMissingPartition, - dataPathHandlers = dataPathHandlers)) + dataPathHandlers = dataPathHandlers) + if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature || (ss.sparkContext.isLocal && + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)))) { + if (dataSource.get().take(1).isEmpty) None else { + Some(dataSource) + } + } else { + Some(dataSource) + } } catch { case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) None else throw e diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index a9185f7a2..b4e427ebc 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -4,11 +4,12 @@ import com.linkedin.feathr.common.configObj.configbuilder.ConfigBuilderException import com.linkedin.feathr.common.exception.FeathrConfigException import com.linkedin.feathr.offline.config.location.SimplePath import com.linkedin.feathr.offline.generation.SparkIOUtils -import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager +import com.linkedin.feathr.offline.job.{LocalFeatureJoinJob, PreprocessedDataFrameManager} import com.linkedin.feathr.offline.source.dataloader.{AvroJsonDataLoader, CsvDataLoader} import com.linkedin.feathr.offline.util.FeathrTestUtils import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.testng.Assert.assertTrue import org.testng.annotations.{BeforeClass, Test} @@ -278,6 +279,53 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { // FeathrTestUtils.assertDataFrameApproximatelyEquals(filteredDf, expectedDf, cmpFunc) } + /* + * Test skip anchored features. + */ + @Test + def testSkipAnchoredFeatures: Unit = { + SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, true) + val df = runLocalFeatureJoinForTest( + joinConfigAsString = + """ + | features: { + | key: a_id + | featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "derived_featureWithNull2"] + | } + """.stripMargin, + featureDefAsString = + """ + | anchors: { + | anchor1: { + | source: "anchorAndDerivations/nullVaueSource.avro.json" + | key: "toUpperCaseExt(mId)" + | features: { + | featureWithNull: "isPresent(value) ? toNumeric(value) : 0" + | } + | } + | anchor2: { + | source: "anchorAndDerivations/nullValueSource.avro.json" + | key: "toUpperCaseExt(mId)" + | features: { + | featureWithNull2: "isPresent(value) ? toNumeric(value) : 0" + | } + | } + |} + |derivations: { + | + | derived_featureWithNull: "featureWithNull * 2" + | derived_featureWithNull2: "featureWithNull2 * 2" + |} + """.stripMargin, + observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv") + + assertTrue(!df.data.columns.contains("featureWithNull")) + assertTrue(!df.data.columns.contains("derived_featureWithNull")) + assertTrue(df.data.columns.contains("derived_featureWithNull2")) + assertTrue(df.data.columns.contains("featureWithNull2")) + SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, false) + } + /* * Test features with null values. */ From d78aea04ee286d1073d137d9c695a4ccbbd62422 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Thu, 9 Feb 2023 21:59:26 -0800 Subject: [PATCH 2/5] Add complex test case --- .../config/sources/FeatureGroupsUpdater.scala | 8 ++- .../testMVELLoopExpFeature-observations.csv | 8 +-- .../offline/AnchoredFeaturesIntegTest.scala | 64 ++++++++++++++++++- 3 files changed, 71 insertions(+), 9 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala index 3956511e5..a0271ade5 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala @@ -111,7 +111,7 @@ private[offline] class FeatureGroupsUpdater { } /** - * Update the feature groups (for Feature gen) based on feature missing features. Few anchored features can be missing if the feature data + * Update the feature groups (for Feature join) based on feature missing features. Few anchored features can be missing if the feature data * is not present. Remove those anchored features, and also the corresponding derived feature which are dependent on it. * * @param featureGroups @@ -119,10 +119,12 @@ private[offline] class FeatureGroupsUpdater { * @param keyTaggedFeatures * @return */ - def getUpdatedFeatureGroupsForJoin(featureGroups: FeatureGroups, allStageFeatures: Seq[String], + def getUpdatedFeatureGroupsForJoin(featureGroups: FeatureGroups, allAnchoredFeaturesWithData: Seq[String], keyTaggedFeatures: Seq[JoiningFeatureParams]): (FeatureGroups, Seq[JoiningFeatureParams]) = { + + // We need to add the window agg features to it as they are also considered anchored features. val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow => - allStageFeatures.contains(featureRow._1)) ++ featureGroups.allWindowAggFeatures ++ featureGroups.allPassthroughFeatures + allAnchoredFeaturesWithData.contains(featureRow._1)) ++ featureGroups.allWindowAggFeatures ++ featureGroups.allPassthroughFeatures val updatedSeqJoinFeature = featureGroups.allSeqJoinFeatures.filter(seqJoinFeature => { // Find the constituent anchored features for every derived feature diff --git a/feathr-impl/src/test/resources/anchorAndDerivations/testMVELLoopExpFeature-observations.csv b/feathr-impl/src/test/resources/anchorAndDerivations/testMVELLoopExpFeature-observations.csv index c61d9a73a..8b5f62271 100644 --- a/feathr-impl/src/test/resources/anchorAndDerivations/testMVELLoopExpFeature-observations.csv +++ b/feathr-impl/src/test/resources/anchorAndDerivations/testMVELLoopExpFeature-observations.csv @@ -1,4 +1,4 @@ -a_id -1 -2 -3 +a_id,timestamp +1,2019-05-20 +2,2019-05-19 +3,2019-05-19 diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index b4e427ebc..bd4c086ce 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -280,7 +280,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { } /* - * Test skip anchored features. + * Test skipping combination of anchored, derived and swa features. */ @Test def testSkipAnchoredFeatures: Unit = { @@ -288,13 +288,43 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { val df = runLocalFeatureJoinForTest( joinConfigAsString = """ + |settings: { + | joinTimeSettings: { + | timestampColumn: { + | def: "timestamp" + | format: "yyyy-MM-dd" + | } + | simulateTimeDelay: 1d + | } + |} + | | features: { | key: a_id - | featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "derived_featureWithNull2"] + | featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "derived_featureWithNull2", + | "aEmbedding", "memberEmbeddingAutoTZ"] | } """.stripMargin, featureDefAsString = """ + | sources: { + | swaSource: { + | location: { path: "generaion/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + | swaSource1: { + | location: { path: "generation/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + |} + | | anchors: { | anchor1: { | source: "anchorAndDerivations/nullVaueSource.avro.json" @@ -310,6 +340,34 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { | featureWithNull2: "isPresent(value) ? toNumeric(value) : 0" | } | } + | swaAnchor: { + | source: "swaSource" + | key: "x" + | features: { + | aEmbedding: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | } + | } + | } + | swaAnchor1: { + | source: "swaSource1" + | key: "x" + | features: { + | memberEmbeddingAutoTZ: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | type: { + | type: TENSOR + | tensorCategory: SPARSE + | dimensionType: [INT] + | valType: FLOAT + | } + | } + | } + | } |} |derivations: { | @@ -323,6 +381,8 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { assertTrue(!df.data.columns.contains("derived_featureWithNull")) assertTrue(df.data.columns.contains("derived_featureWithNull2")) assertTrue(df.data.columns.contains("featureWithNull2")) + assertTrue(!df.data.columns.contains("aEmbedding")) + assertTrue(df.data.columns.contains("memberEmbeddingAutoTZ")) SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, false) } From 6bfef8bd95200c9bdcd4972e46b1dc390565a429 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Thu, 9 Feb 2023 22:10:34 -0800 Subject: [PATCH 3/5] Add comments --- .../feathr/offline/logical/MultiStageJoinPlanner.scala | 2 +- .../offline/transformation/AnchorToDataSourceMapper.scala | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/logical/MultiStageJoinPlanner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/logical/MultiStageJoinPlanner.scala index 7f971830d..ac8d2e86e 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/logical/MultiStageJoinPlanner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/logical/MultiStageJoinPlanner.scala @@ -38,7 +38,7 @@ private[offline] class MultiStageJoinPlanner extends LogicalPlanner[MultiStageJo log.info(s"allRequestedFeatures: $allRequestedFeatures, keyTagIntsToStrings: $keyTagIntsToStrings") // Resolve feature dependencies - val allRequiredFeatures = getDependencyOrdering(featureGroups.allAnchoredFeatures, featureGroups.allDerivedFeatures, allRequestedFeatures) + val allRequiredFeatures = getDependencyOrdering(featureGroups.allAnchoredFeatures, featureGroups.allDerivedFeatures, allRequestedFeatures) log.info(s"allRequiredFeatures: $allRequiredFeatures") // Plan the join stages required to resolve all these features diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index 3ab9cf771..745b28975 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -75,6 +75,9 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH expectDatumType = Some(expectDatumType), failOnMissingPartition = failOnMissingPartition, dataPathHandlers = dataPathHandlers) + + // If it is a nonTime based source, we will load the dataframe at runtime, however this is too late to decide if the + // feature should be skipped. So, we will try to take the first row here, and see if it succeeds. if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature || (ss.sparkContext.isLocal && SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)))) { if (dataSource.get().take(1).isEmpty) None else { From c636e6236591738d2441085ebcf70e76b5678ee5 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Fri, 10 Feb 2023 08:53:33 -0800 Subject: [PATCH 4/5] minor cosmetic issue --- .../feathr/offline/join/DataFrameFeatureJoiner.scala | 6 +++--- gradle.properties | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index af1aded7f..d3253217f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -191,12 +191,12 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d .toIndexedSeq .map(featureGroups.allAnchoredFeatures), failOnMissingPartition) - val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + val shouldSkipFeature = (FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean) || + (ss.sparkContext.isLocal && SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) val updatedSourceAccessorMap = anchorSourceAccessorMap.filter(anchorEntry => anchorEntry._2.isDefined) .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) - val (updatedFeatureGroups, updatedKeyTaggedFeatures, updatedLogicalPlan) = if (shouldSkipFeature || (ss.sparkContext.isLocal && - SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { + val (updatedFeatureGroups, updatedKeyTaggedFeatures, updatedLogicalPlan) = if (shouldSkipFeature) { val (newFeatureGroups, newKeyTaggedFeatures) = FeatureGroupsUpdater().getUpdatedFeatureGroupsForJoin(featureGroups, updatedSourceAccessorMap.keySet.flatMap(featureAnchorWithSource => featureAnchorWithSource.featureAnchor.features).toSeq, keyTaggedFeatures) diff --git a/gradle.properties b/gradle.properties index bec161da9..a7a14993b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.10.4-rc6 +version=0.10.4-rc7 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12 From 6691717fea11ac53572941c1fe42b676918df980 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Fri, 10 Feb 2023 09:05:48 -0800 Subject: [PATCH 5/5] Method name change --- .../offline/config/sources/FeatureGroupsUpdater.scala | 2 +- .../feathr/offline/join/DataFrameFeatureJoiner.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala index a0271ade5..ff31b3ad0 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala @@ -119,7 +119,7 @@ private[offline] class FeatureGroupsUpdater { * @param keyTaggedFeatures * @return */ - def getUpdatedFeatureGroupsForJoin(featureGroups: FeatureGroups, allAnchoredFeaturesWithData: Seq[String], + def removeMissingFeatures(featureGroups: FeatureGroups, allAnchoredFeaturesWithData: Seq[String], keyTaggedFeatures: Seq[JoiningFeatureParams]): (FeatureGroups, Seq[JoiningFeatureParams]) = { // We need to add the window agg features to it as they are also considered anchored features. diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index d3253217f..90e8ceb8f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -196,13 +196,13 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d val updatedSourceAccessorMap = anchorSourceAccessorMap.filter(anchorEntry => anchorEntry._2.isDefined) .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) - val (updatedFeatureGroups, updatedKeyTaggedFeatures, updatedLogicalPlan) = if (shouldSkipFeature) { - val (newFeatureGroups, newKeyTaggedFeatures) = FeatureGroupsUpdater().getUpdatedFeatureGroupsForJoin(featureGroups, + val (updatedFeatureGroups, updatedLogicalPlan) = if (shouldSkipFeature) { + val (newFeatureGroups, newKeyTaggedFeatures) = FeatureGroupsUpdater().removeMissingFeatures(featureGroups, updatedSourceAccessorMap.keySet.flatMap(featureAnchorWithSource => featureAnchorWithSource.featureAnchor.features).toSeq, keyTaggedFeatures) val newLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(newFeatureGroups, newKeyTaggedFeatures) - (newFeatureGroups, newKeyTaggedFeatures, newLogicalPlan) - } else (featureGroups, keyTaggedFeatures, logicalPlan) + (newFeatureGroups, newLogicalPlan) + } else (featureGroups, logicalPlan) implicit val joinExecutionContext: JoinExecutionContext = JoinExecutionContext(ss, updatedLogicalPlan, updatedFeatureGroups, bloomFilters, Some(saltedJoinFrequentItemDFs))