Skip to content

Commit

Permalink
Initial functional commit for remote get_historical_features
Browse files Browse the repository at this point in the history
  • Loading branch information
dmartinol committed May 9, 2024
1 parent e7cd32f commit 0d0cffd
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 112 deletions.
182 changes: 98 additions & 84 deletions sdk/python/feast/infra/offline_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.parquet
from pydantic import StrictStr
from pydantic import StrictInt, StrictStr

from feast import OnDemandFeatureView
from feast.data_source import DataSource
Expand All @@ -17,61 +17,67 @@
RetrievalJob,
)
from feast.infra.registry.base_registry import BaseRegistry
from feast.infra.registry.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage


class RemoteOfflineStoreConfig(FeastConfigBaseModel):
type: Literal["remote"] = "remote"
host: StrictStr
""" str: remote offline store server port, e.g. the host URL for offline store of arrow flight server. """

offline_type: StrictStr = "remote"
""" str: Provider name or a class name that implements Offline store."""

path: StrictStr = ""
""" str: Path to metadata store.
If offline_type is 'remote', then this is a URL for offline server """

host: StrictStr = ""
""" str: host to offline store.
If offline_type is 'remote', then this is a host URL for offline store of arrow flight server """

port: StrictStr = ""
""" str: host to offline store."""
port: Optional[StrictInt] = None
""" str: remote offline store server port."""


class RemoteRetrievalJob(RetrievalJob):
def __init__(
self,
config: RepoConfig,
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
# TODO add missing parameters from the OfflineStore API
self,
config: RepoConfig,
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
# TODO add missing parameters from the OfflineStore API
):
# Generate unique command identifier
self.command = str(uuid.uuid4())
# Initialize the client connection
self.client = pa.flight.connect(f"grpc://{config.offline_store.host}:{config.offline_store.port}")
self.client = pa.flight.connect(
f"grpc://{config.offline_store.host}:{config.offline_store.port}"
)
# Put API parameters
self._put_parameters(feature_refs, entity_df)

def _put_parameters(self, feature_refs, entity_df):
historical_flight_descriptor = pa.flight.FlightDescriptor.for_command(
self.command
)

entity_df_table = pa.Table.from_pandas(entity_df)
historical_flight_descriptor = pa.flight.FlightDescriptor.for_command(self.command)
writer, _ = self.client.do_put(historical_flight_descriptor,
entity_df_table.schema.with_metadata({
'command': self.command,
'api': 'get_historical_features',
'param': 'entity_df'}))
writer, _ = self.client.do_put(
historical_flight_descriptor,
entity_df_table.schema.with_metadata(
{
"command": self.command,
"api": "get_historical_features",
"param": "entity_df",
}
),
)
writer.write_table(entity_df_table)
writer.close()

features_array = pa.array(feature_refs)
features_batch = pa.RecordBatch.from_arrays([features_array], ['features'])
writer, _ = self.client.do_put(historical_flight_descriptor,
features_batch.schema.with_metadata({
'command': self.command,
'api': 'get_historical_features',
'param': 'features'}))
features_batch = pa.RecordBatch.from_arrays([features_array], ["features"])
writer, _ = self.client.do_put(
historical_flight_descriptor,
features_batch.schema.with_metadata(
{
"command": self.command,
"api": "get_historical_features",
"param": "features",
}
),
)
writer.write_batch(features_batch)
writer.close()

Expand All @@ -96,65 +102,73 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:


class RemoteOfflineStore(OfflineStore):
def __init__(
self,

arrow_host,
arrow_port
):
self.arrow_host = arrow_host
self.arrow_port = arrow_port

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry = None,
project: str = '',
full_feature_names: bool = False,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RemoteRetrievalJob:
offline_store_config = config.offline_store
assert isinstance(config.offline_store_config, RemoteOfflineStoreConfig)
store_type = offline_store_config.type
port = offline_store_config.port
host = offline_store_config.host
print(f"config.offline_store is {type(config.offline_store)}")
assert isinstance(config.offline_store, RemoteOfflineStoreConfig)

return RemoteRetrievalJob(RepoConfig, feature_refs, entity_df)
# TODO: extend RemoteRetrievalJob API with all method parameters
return RemoteRetrievalJob(
config=config, feature_refs=feature_refs, entity_df=entity_df
)

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def pull_all_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
# TODO Implementation here.
raise NotImplementedError

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
# TODO Implementation here.
raise NotImplementedError

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def pull_latest_from_table_or_query(self,
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime) -> RetrievalJob:
""" Pulls data from the offline store for use in materialization."""
print("Pulling latest features from my offline store")
# Implementation here.
pass

def write_logged_features(
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
):
""" Optional method to have Feast support logging your online features."""
# Implementation here.
pass
# TODO Implementation here.
raise NotImplementedError

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
# Implementation here.
pass
# TODO Implementation here.
raise NotImplementedError
Loading

0 comments on commit 0d0cffd

Please sign in to comment.