Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature names conflicts check and auto-correction #1024

Merged
merged 18 commits into from
Feb 14, 2023
16 changes: 15 additions & 1 deletion docs/concepts/get-offline-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,25 @@ The path of a dataset as the 'spine' for the to-be-created training dataset. We
2. **Timestamp Column:** A column representing the event time of the row. By default, Feathr will make sure the feature values queried have a timestamp earlier than the timestamp in observation data, ensuring no data leakage in the resulting training dataset. Refer to [Point in time Joins](./point-in-time-join.md) for more details.

3. **Other columns** will be simply pass through to the output training dataset, which can be treated as immutable columns.

## More on `Feature Query`

After you have defined all the features, you probably don't want to use all of them in this particular program. In this case, instead of putting every features in this `FeatureQuery` part, you can just put a selected list of features. Note that they have to be of the same key.

## Feature names conflicts check
enya-yx marked this conversation as resolved.
Show resolved Hide resolved

If any of feature names provided by `Feature Query` conflict with column names of the 'observation' dataset, this 'get_offline_features' job will fail. It can cost several minutes to get this failure from spark. To avoid this slowness,feathr support to check if any of these conflicts exist before submitting the job to cloud.

The checking steps are:
1. Check if the `conflicts_auto_correction` in the `observation_settings`is set (default by None). If it's not None, it means spark will handle checking and solving these conflicts. In this case, python client side will submit this job to spark directly. Otherwise, it will go to the below steps. In terms of `conflicts_auto_correction`, it also contains two parameters, `rename_features` and `suffix`. By default, spark will rename dataset columns with a suffix "_1". You may rename feature names by set `rename_features` to True and provide a customized suffix.
2. Try to load dataset without credential and compare column names with feature names. This is to support the case when the dataset is saved in a public storage.
3. If cannot load the dataset in the first step, will try to load it with credential anc compare column names with feature names. It can only support loading files from storages requiring credential your environment defined. For example, if your `spark_cluster` is `databricks`, it can only load dataset under the 'dbfs' path belonging to this databricks.
4. If cannot load the dataset from step1 and step2, will try to compare column names provided by the parameter `dataset_column_names` if it's not empty.
5. If cannot get column names from above 3 steps, will show a warning message and submit the job to cloud. The spark will also check this kind of conflicts.
6. If any conflicts found in step 1 to 3, will throw an exception and the process will be stoped. To solve these conflicts, you may either change related dataset column names or change feature names. If you decide to change feature names and you have registered these features, you may need to register them again with updated names and a new project name.

Workflow graph for the conflicts checking and handling:
![conflicts-check-and-handle](../images/conflicts-check-and-handle.png)

## Difference between `materialize_features` and `get_offline_features` API

It is sometimes confusing between "getting offline features" in this document and the "[getting materialized features](./materializing-features.md)" part, given they both seem to "get features and put it somewhere". However there are some differences and you should know when to use which:
Expand Down
Binary file added docs/images/conflicts-check-and-handle.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.linkedin.feathr.common.exception._
import com.linkedin.feathr.common.{FeatureInfo, Header, InternalApi, JoiningFeatureParams, RichConfig, TaggedFeatureName}
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater
import com.linkedin.feathr.offline.config.{FeathrConfig, FeathrConfigLoader, FeatureGroupsGenerator, FeatureJoinConfig}
import com.linkedin.feathr.offline.config.{ConflictsAutoCorrectionSetting, FeathrConfig, FeathrConfigLoader, FeatureGroupsGenerator, FeatureJoinConfig, JoinConfigSettings}
import com.linkedin.feathr.offline.generation.{DataFrameFeatureGenerator, FeatureGenKeyTagAnalyzer, StreamingFeatureGenerator}
import com.linkedin.feathr.offline.job._
import com.linkedin.feathr.offline.join.DataFrameFeatureJoiner
Expand All @@ -18,6 +18,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.internal.SQLConf

import scala.util.{Failure, Success}
import java.util.UUID

/**
* FeathrClient is the entry point into Feathr for joining observation data with features. To achieve this, instantiate this class
Expand Down Expand Up @@ -207,6 +208,43 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
keyTaggedFeatures.map(_.featureName).intersect(fieldNames)
}

/**
* Rename feature names conflicting with data set column names
* by applying provided suffix
* Eg. If have the feature name 'example' and the suffix '1',
* it will become 'example_1" after renaming
*
* @param df original dataframe
* @param header original header
* @param conflictFeatureNames conflicted feature names
* @param suffix suffix to apply to feature names
*
* @return pair of renamed (dataframe, header)
*/
private[offline] def renameFeatureNames(
df: DataFrame,
header: Header,
conflictFeatureNames: Seq[String],
suffix: String): (DataFrame, Header) = {
val uuid = UUID.randomUUID()
var renamedDF = df
conflictFeatureNames.foreach(name => {
renamedDF = renamedDF.withColumnRenamed(name, name + '_' + uuid)
renamedDF = renamedDF.withColumnRenamed(name + '_' + suffix, name)
renamedDF = renamedDF.withColumnRenamed(name + '_' + uuid, name + '_' + suffix)
})

val featuresInfoMap = header.featureInfoMap.map {
case (featureName, featureInfo) =>
val name = featureInfo.columnName
val conflict = conflictFeatureNames.contains(name)
val fi = if (conflict) new FeatureInfo(name + '_' + suffix, featureInfo.featureType) else featureInfo
val fn = if (conflict) new TaggedFeatureName(featureName.getKeyTag, name + '_' + suffix) else featureName
fn -> fi
}
(renamedDF, new Header(featuresInfoMap))
}

/**
* Join all requested feature to the observation dataset
*
Expand Down Expand Up @@ -269,16 +307,46 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
"Feature names must conform to " +
s"regular expression: ${AnchorUtils.featureNamePattern}, but found feature names: $invalidFeatureNames")
}

val joiner = new DataFrameFeatureJoiner(logicalPlan=logicalPlan,dataPathHandlers=dataPathHandlers, mvelContext)
// Check conflicts between feature names and data set column names
val conflictFeatureNames: Seq[String] = findConflictFeatureNames(keyTaggedFeatures, left.schema.fieldNames)
val joinConfigSettings = joinConfig.settings
val conflictsAutoCorrectionSetting = if(joinConfigSettings.isDefined) joinConfigSettings.get.conflictsAutoCorrectionSetting else None
if (conflictFeatureNames.nonEmpty) {
throw new FeathrConfigException(
ErrorLabel.FEATHR_USER_ERROR,
"Feature names must be different from field names in the observation data. " +
s"Please rename feature ${conflictFeatureNames} or rename the same field names in the observation data.")
}
if(!conflictsAutoCorrectionSetting.isDefined) {
throw new FeathrConfigException(
ErrorLabel.FEATHR_USER_ERROR,
"Feature names must be different from field names in the observation data. " +
s"Please rename feature ${conflictFeatureNames} or rename the same field names in the observation data.")
}

val joiner = new DataFrameFeatureJoiner(logicalPlan=logicalPlan,dataPathHandlers=dataPathHandlers, mvelContext)
joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, left, rowBloomFilterThreshold)
// If auto correction is required, will solve conflicts automatically
val renameFeatures = conflictsAutoCorrectionSetting.get.renameFeatureList
val suffix = conflictsAutoCorrectionSetting.get.suffix
log.warn(s"Found conflicted field names: ${conflictFeatureNames}. Will auto correct them by applying suffix: ${suffix}")
var leftRenamed = left
conflictFeatureNames.foreach(name => {
leftRenamed = leftRenamed.withColumnRenamed(name, name+'_'+suffix)
})
val conflictFeatureNames2: Seq[String] = findConflictFeatureNames(keyTaggedFeatures, leftRenamed.schema.fieldNames)
if (conflictFeatureNames2.nonEmpty) {
throw new FeathrConfigException(
ErrorLabel.FEATHR_USER_ERROR,
s"Failed to apply auto correction to solve conflicts. Still got conflicts after applying provided suffix ${suffix} for fields: " +
s"${conflictFeatureNames}. Please provide another suffix or solve conflicts manually.")
}
val (df, header) = joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, leftRenamed, rowBloomFilterThreshold)
if(renameFeatures) {
log.warn(s"Suffix :${suffix} is applied into feature names: ${conflictFeatureNames} to avoid conflicts in outputs")
renameFeatureNames(df, header, conflictFeatureNames, suffix)
} else {
log.warn(s"Suffix :${suffix} is applied into dataset Column names: ${conflictFeatureNames} to avoid conflicts in outputs")
(df, header)
}
}
else
joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, left, rowBloomFilterThreshold)
}

private[offline] def getFeatureGroups(): FeatureGroups = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,9 @@ case class DateTimeRange(startTime: LocalDateTime, endTime: LocalDateTime)
* Join config settings, consists of [[ObservationDataTimeSettings]] class, and [[JoinTimeSettings]] class (both optional).
* @param observationDataTimeSetting Settings which have parameters specifying how to load the observation data.
* @param joinTimeSetting Settings which have parameters specifying how to join the observation data with the feature data.
* @param conflictsAutoCorrectionSetting Settings which have parameters specifying how to solve feature names conflicts with dataset automatically
*/
private[feathr] case class JoinConfigSettings(observationDataTimeSetting: Option[ObservationDataTimeSetting], joinTimeSetting: Option[JoinTimeSetting]) {}
private[feathr] case class JoinConfigSettings(observationDataTimeSetting: Option[ObservationDataTimeSetting], joinTimeSetting: Option[JoinTimeSetting], conflictsAutoCorrectionSetting: Option[ConflictsAutoCorrectionSetting]) {}

/**
* ObservationDataTimeSetting Definition class with parameters required to load time partitioned observation data used for the feature join.
Expand All @@ -308,6 +309,14 @@ private[offline] case class ObservationDataTimeSetting(dateParam: DateParam, tim
@JsonDeserialize(using = classOf[JoinTimeConfigSettingDefinitionDeserializer])
private[offline] case class JoinTimeSetting(timestampColumn: TimestampColumn, simulateTimeDelay: Option[Duration], useLatestFeatureData: Boolean)

/**
* ConflictsAutoCorrectionSetting object. This object contains parameters related to auto correct name conflicts among feature names and dataset.
* @param renameFeatureList If rename feature list. 'False' by default which means to rename dataset
* @param suffix Suffix used to rename conflicted names
*/
@JsonDeserialize(using = classOf[ConflictsAutoCorrectionSettingDeserializer])
private[offline] case class ConflictsAutoCorrectionSetting(renameFeatureList: Boolean, suffix: String)

/**
* Timestamp column object
* @param name Name of the timestamp column, can be sql expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.linkedin.feathr.offline.config

import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.core.{JsonParser, TreeNode}
import com.fasterxml.jackson.databind.node.{ObjectNode, TextNode, TreeTraversingParser}
import com.fasterxml.jackson.databind.node.{NumericNode, ObjectNode, TextNode, TreeTraversingParser, ValueNode}
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode}
import com.linkedin.feathr.common.DateParam
import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrConfigException}
Expand All @@ -18,6 +18,7 @@ private class FeatureJoinConfigDeserializer extends JsonDeserializer[FeatureJoin
type Arg = Map[String, Seq[KeyedFeatureList]]
private val OBSERVATION_DATA_TIME_SETTINGS = "observationDataTimeSettings"
private val JOIN_TIME_SETTINGS = "joinTimeSettings"
private val CONFLICTS_AUTO_CORRECTION_SETTINGS = "conflictsAutoCorrectionSettings"

override def deserialize(jp: JsonParser, ctxt: DeserializationContext): FeatureJoinConfig = {

Expand Down Expand Up @@ -48,7 +49,7 @@ private class FeatureJoinConfigDeserializer extends JsonDeserializer[FeatureJoin
*/
private def parseFeatureJoinConfigSettings(settingsNode: JsonNode, jp: JsonParser): JoinConfigSettings = {

val supportedSettingFields = Set(OBSERVATION_DATA_TIME_SETTINGS, JOIN_TIME_SETTINGS)
val supportedSettingFields = Set(OBSERVATION_DATA_TIME_SETTINGS, JOIN_TIME_SETTINGS, CONFLICTS_AUTO_CORRECTION_SETTINGS)
val unrecognizedFields = settingsNode.fieldNames().asScala.toSet -- supportedSettingFields
if (unrecognizedFields.nonEmpty) {
throw new FeathrConfigException(
Expand All @@ -70,7 +71,14 @@ private class FeatureJoinConfigDeserializer extends JsonDeserializer[FeatureJoin
case _ => None
}

JoinConfigSettings(observationDataTimeSettingsDefinition, joinTimeSettingsDefinition)
val conflictsAutoCorrectionSetting = settingsNode.get(CONFLICTS_AUTO_CORRECTION_SETTINGS) match {
case autoCorrectionNode: ObjectNode =>
val autoCorrectionTreeParser = new TreeTraversingParser(autoCorrectionNode, jp.getCodec)
Some(autoCorrectionTreeParser.getCodec.readValue(autoCorrectionTreeParser, classOf[ConflictsAutoCorrectionSetting]))
case _ => None
}

JoinConfigSettings(observationDataTimeSettingsDefinition, joinTimeSettingsDefinition, conflictsAutoCorrectionSetting)
}
}

Expand Down Expand Up @@ -254,6 +262,35 @@ private class JoinTimeConfigSettingDefinitionDeserializer extends JsonDeserializ
}
}

/**
* ConflictsAutoCorrectionSetting config deserializer.
*/
private class ConflictsAutoCorrectionSettingDeserializer extends JsonDeserializer[ConflictsAutoCorrectionSetting] {
val RENAME_FEATURES = "renameFeatures"
val RENAME_FEATURES_DEFAULT_VALUE = false
val AUTO_CORRECTION_SUFFIX = "suffix"
val AUTO_CORRECTION_SUFFIX_VALUE = "1"

override def deserialize(p: JsonParser, ctxt: DeserializationContext): ConflictsAutoCorrectionSetting = {
val codec = p.getCodec
val node = codec.readTree[TreeNode](p)

node match {
case innerNode: ObjectNode =>
val renameFeatures = innerNode.get(RENAME_FEATURES) match {
case value: TextNode => (value.textValue() == "True")
case _ => RENAME_FEATURES_DEFAULT_VALUE
}
val suffix = innerNode.get(AUTO_CORRECTION_SUFFIX) match {
case suffix: TextNode => suffix.textValue()
case suffix: NumericNode => suffix.asInt().toString
case _ => AUTO_CORRECTION_SUFFIX_VALUE
}
ConflictsAutoCorrectionSetting(renameFeatures, suffix)
case _ => ConflictsAutoCorrectionSetting(RENAME_FEATURES_DEFAULT_VALUE, AUTO_CORRECTION_SUFFIX_VALUE)
}
}
}
/**
* Parameters which relate on how to load the time range relative to current timestamp (reference time).
* For example, as reference time is LATEST, then we take the job execution timestamp as the reference time and the other fields are relative to this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[converters] object PegasusRecordSettingsConverter extends PegasusRecordS
override def convert(settings: Settings): JoinConfigSettings = {
val inputDataTimeSettings = Option(settings.getInputDataTimeSettings(GetMode.DEFAULT)).map(convertInputDataTimeSettings)
val joinTimeSetting = Option(settings.getJoinTimeSettings(GetMode.DEFAULT)).map(convertJoinTimeSettings)
JoinConfigSettings(inputDataTimeSettings, joinTimeSetting)
JoinConfigSettings(inputDataTimeSettings, joinTimeSetting, None)
}

/**
Expand Down
3 changes: 3 additions & 0 deletions feathr-impl/src/test/resources/simple-obs2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mId,timestamp,aEmbedding,bEmbedding
1,2019-05-20,1,3
2,2019-05-19,2,4
Loading