From 41b473c96635eec95f71f07d7dd23006282abb58 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Mon, 3 Feb 2025 13:57:38 -0800 Subject: [PATCH 01/15] Add Couchbase Columnar and Sync Deps Signed-off-by: Elliot Scribner --- drop_col_bulk.py | 141 ++++++++++++++++++ .../feature_repo/data/online_store_for_pg.db | 0 .../requirements/py3.10-ci-requirements.txt | 2 + .../requirements/py3.11-ci-requirements.txt | 2 + .../requirements/py3.9-ci-requirements.txt | 2 + setup.py | 5 +- 6 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 drop_col_bulk.py create mode 100644 go/internal/test/flexible_coyote/feature_repo/data/online_store_for_pg.db diff --git a/drop_col_bulk.py b/drop_col_bulk.py new file mode 100644 index 0000000000..bb19c450a2 --- /dev/null +++ b/drop_col_bulk.py @@ -0,0 +1,141 @@ +from couchbase_columnar.cluster import Cluster +from couchbase_columnar.credential import Credential +from typing import List, Dict +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +import logging +import threading + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def create_cluster(connstr: str, username: str, password: str) -> Cluster: + """Create cluster connection""" + cred = Credential.from_username_and_password(username, password) + return Cluster.create_instance(connstr, cred) + + +def get_collections(cluster: Cluster) -> List[str]: + """Get all collections with retry logic""" + query = """ + SELECT VALUE d.DatabaseName || '.' || d.DataverseName || '.' || d.DatasetName + FROM System.Metadata.`Dataset` d + WHERE d.DataverseName <> "Metadata"; + """ + + max_retries = 3 + for attempt in range(max_retries): + try: + res = cluster.execute_query(query) + return res.get_all_rows() + except Exception as e: + if attempt == max_retries - 1: + raise + logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying...") + time.sleep(2 ** attempt) + + +def drop_collection(cluster: Cluster, collection: str) -> tuple[bool, str]: + """ + Drop a single collection + Returns: (success: bool, status: str) + """ + try: + if collection == 'Default.Default.feast_driver_hourly_stats': + return False, "error" + drop_query = f'DROP COLLECTION {collection};' + cluster.execute_query(drop_query) + return True, "success" + except Exception as e: + error_str = str(e) + if '"code":24025' in error_str: # Collection not found + return False, "not_found" + else: + return False, "error" + + +class ProgressTracker: + def __init__(self, total: int): + self.lock = threading.Lock() + self.stats = { + 'successful': 0, + 'not_found': 0, + 'failed': 0, + 'processed': 0, + 'total': total + } + + def update(self, success: bool, status: str, collection: str): + with self.lock: + if success: + self.stats['successful'] += 1 + logger.info(f'Successfully dropped collection: {collection}') + elif status == "not_found": + self.stats['not_found'] += 1 + logger.warning(f'Collection not found (may be already dropped): {collection}') + else: + self.stats['failed'] += 1 + logger.error(f'Error dropping collection {collection}') + + self.stats['processed'] += 1 + logger.info( + f"Progress: {self.stats['processed']}/{self.stats['total']} collections processed " + f"({self.stats['successful']} successful, {self.stats['not_found']} not found, " + f"{self.stats['failed']} failed)" + ) + + +def process_collection(cluster: Cluster, collection: str, tracker: ProgressTracker): + """Process a single collection and update progress""" + success, status = drop_collection(cluster, collection) + tracker.update(success, status, collection) + + +def main() -> None: + connstr = 'couchbases://cb.9d1ccd1-9osjioll.cloud.couchbase.com' + username = 'username' + password = 'Password!123' + + # DONT DROP feast_driver_hourly_stats + # connstr = 'couchbases://cb.zldvu6s2qoj8zuoc.cloud.couchbase.com' + # username = 'username' + # password = 'Password!123' + + try: + cluster = create_cluster(connstr, username, password) + + logger.info("Fetching collections...") + collections = get_collections(cluster) + total_collections = len(collections) + logger.info(f'Found {total_collections} collections') + + # Initialize progress tracker + tracker = ProgressTracker(total_collections) + + # Process collections in parallel + with ThreadPoolExecutor(max_workers=3) as executor: + futures = [ + executor.submit(process_collection, cluster, collection, tracker) + for collection in collections + ] + # Wait for all tasks to complete + for future in as_completed(futures): + try: + future.result() + except Exception as e: + logger.error(f"Unexpected error in worker thread: {e}") + + # Final summary + logger.info("\nFinal results:") + logger.info(f"Successfully dropped: {tracker.stats['successful']}") + logger.info(f"Not found/already dropped: {tracker.stats['not_found']}") + logger.info(f"Failed: {tracker.stats['failed']}") + + except Exception as e: + logger.error(f"Critical error: {e}") + raise + + +if __name__ == '__main__': + main() diff --git a/go/internal/test/flexible_coyote/feature_repo/data/online_store_for_pg.db b/go/internal/test/flexible_coyote/feature_repo/data/online_store_for_pg.db new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 96948e78e2..9c0dc69d14 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -130,6 +130,8 @@ comm==0.2.2 # ipywidgets couchbase==4.3.2 # via feast (setup.py) +couchbase-columnar==1.0.0 + # via feast (setup.py) coverage[toml]==7.6.10 # via pytest-cov cryptography==43.0.3 diff --git a/sdk/python/requirements/py3.11-ci-requirements.txt b/sdk/python/requirements/py3.11-ci-requirements.txt index 20976b0204..426677b3ed 100644 --- a/sdk/python/requirements/py3.11-ci-requirements.txt +++ b/sdk/python/requirements/py3.11-ci-requirements.txt @@ -128,6 +128,8 @@ comm==0.2.2 # ipywidgets couchbase==4.3.2 # via feast (setup.py) +couchbase-columnar==1.0.0 + # via feast (setup.py) coverage[toml]==7.6.10 # via pytest-cov cryptography==43.0.3 diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 00eb59c93d..8672d7b75c 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -132,6 +132,8 @@ comm==0.2.2 # ipywidgets couchbase==4.3.2 # via feast (setup.py) +couchbase-columnar==1.0.0 + # via feast (setup.py) coverage[toml]==7.6.10 # via pytest-cov cryptography==43.0.3 diff --git a/setup.py b/setup.py index f20c94d551..80d38e0154 100644 --- a/setup.py +++ b/setup.py @@ -147,7 +147,10 @@ SINGLESTORE_REQUIRED = ["singlestoredb<1.8.0"] -COUCHBASE_REQUIRED = ["couchbase==4.3.2"] +COUCHBASE_REQUIRED = [ + "couchbase==4.3.2", + "couchbase-columnar==1.0.0" +] MSSQL_REQUIRED = ["ibis-framework[mssql]>=9.0.0,<10"] From 3773bb78a44094086838f7ab806afa76333a4f25 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Mon, 3 Feb 2025 14:01:50 -0800 Subject: [PATCH 02/15] Couchbase Columnar Offline Store Signed-off-by: Elliot Scribner --- .../couchbase_offline_store/__init__.py | 0 .../couchbase_offline_store/couchbase.py | 713 ++++++++++++++++++ .../couchbase_source.py | 399 ++++++++++ .../couchbase_offline_store/tests/__init__.py | 0 .../tests/data_source.py | 145 ++++ .../feast/infra/utils/couchbase/__init__.py | 0 .../infra/utils/couchbase/couchbase_utils.py | 13 + .../protos/feast/core/FeatureService_pb2.pyi | 21 + sdk/python/feast/repo_config.py | 1 + sdk/python/feast/type_map.py | 29 + 10 files changed, 1321 insertions(+) create mode 100644 sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/__init__.py create mode 100644 sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py create mode 100644 sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py create mode 100644 sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/__init__.py create mode 100644 sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py create mode 100644 sdk/python/feast/infra/utils/couchbase/__init__.py create mode 100644 sdk/python/feast/infra/utils/couchbase/couchbase_utils.py diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/__init__.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py new file mode 100644 index 0000000000..2c8198435d --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -0,0 +1,713 @@ +import contextlib +from dataclasses import asdict +from datetime import datetime, timedelta +from typing import ( + Any, + Callable, + ContextManager, + Dict, + Iterator, + KeysView, + List, + Literal, + Optional, + Tuple, + Union, + cast, +) + +import numpy as np +import pandas as pd +import pyarrow +import pyarrow as pa +from couchbase_columnar.cluster import Cluster +from couchbase_columnar.common.result import BlockingQueryResult +from couchbase_columnar.credential import Credential +from couchbase_columnar.options import QueryOptions +from jinja2 import BaseLoader, Environment +from pydantic import StrictFloat, StrictStr + +from feast.data_source import DataSource +from feast.errors import InvalidEntityType, ZeroRowsQueryResult +from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView +from feast.infra.offline_stores.offline_store import ( + OfflineStore, + RetrievalJob, + RetrievalMetadata, +) +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.utils.couchbase.couchbase_utils import normalize_timestamp +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.saved_dataset import SavedDatasetStorage + +from ... import offline_utils +from .couchbase_source import ( + CouchbaseColumnarSource, + SavedDatasetCouchbaseColumnarStorage, +) + + +class CouchbaseColumnarOfflineStoreConfig(FeastConfigBaseModel): + """Offline store config for Couchbase Columnar""" + + type: Literal["couchbase"] = "couchbase" + + connection_string: Optional[StrictStr] = None + user: Optional[StrictStr] = None + password: Optional[StrictStr] = None + timeout: StrictFloat = 120 + + +class CouchbaseColumnarOfflineStore(OfflineStore): + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + """ + Fetch the latest rows for each join key. + """ + assert isinstance(config.offline_store, CouchbaseColumnarOfflineStoreConfig) + assert isinstance(data_source, CouchbaseColumnarSource) + from_expression = data_source.get_table_query_string() + + partition_by_join_key_string = ", ".join(_append_alias(join_key_columns, "a")) + if partition_by_join_key_string != "": + partition_by_join_key_string = ( + "PARTITION BY " + partition_by_join_key_string + ) + timestamps = [timestamp_field] + if created_timestamp_column: + timestamps.append(created_timestamp_column) + timestamp_desc_string = " DESC, ".join(_append_alias(timestamps, "a")) + " DESC" + a_field_string = ", ".join( + _append_alias(join_key_columns + feature_name_columns + timestamps, "a") + ) + b_field_string = ", ".join( + _append_alias(join_key_columns + feature_name_columns + timestamps, "b") + ) + + start_date_normalized = normalize_timestamp(start_date) + end_date_normalized = normalize_timestamp(end_date) + + query = f""" + SELECT + {b_field_string} + {f", {repr(DUMMY_ENTITY_VAL)} AS {DUMMY_ENTITY_ID}" if not join_key_columns else ""} + FROM ( + SELECT {a_field_string}, + ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row + FROM {from_expression} a + WHERE a.{timestamp_field} BETWEEN '{start_date_normalized}' AND '{end_date_normalized}' + ) b + WHERE _feast_row = 1 + """ + + return CouchbaseColumnarRetrievalJob( + query=query, + config=config, + full_feature_names=False, + on_demand_feature_views=None, + timestamp_field=timestamp_field, + ) + + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> RetrievalJob: + """ + Retrieve historical features using point-in-time joins. + """ + assert isinstance(config.offline_store, CouchbaseColumnarOfflineStoreConfig) + for fv in feature_views: + assert isinstance(fv.batch_source, CouchbaseColumnarSource) + + entity_schema = _get_entity_schema(entity_df, config) + + entity_df_event_timestamp_col = ( + offline_utils.infer_event_timestamp_from_entity_df(entity_schema) + ) + + entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( + entity_df, entity_df_event_timestamp_col, config + ) + + @contextlib.contextmanager + def query_generator() -> Iterator[str]: + source = cast(CouchbaseColumnarSource, feature_views[0].batch_source) + database = source.database + scope = source.scope + + table_name = ( + f"{database}.{scope}.{offline_utils.get_temp_entity_table_name()}" + ) + + _upload_entity_df(config, entity_df, table_name) + + expected_join_keys = offline_utils.get_expected_join_keys( + project, feature_views, registry + ) + + offline_utils.assert_expected_columns_in_entity_df( + entity_schema, expected_join_keys, entity_df_event_timestamp_col + ) + + query_context = offline_utils.get_feature_view_query_context( + feature_refs, + feature_views, + registry, + project, + entity_df_event_timestamp_range, + ) + + query_context_dict = [asdict(context) for context in query_context] + + try: + query = build_point_in_time_query( + query_context_dict, + left_table_query_string=table_name, + entity_df_event_timestamp_col=entity_df_event_timestamp_col, + entity_df_columns=entity_schema.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=full_feature_names, + ) + yield query + finally: + if table_name: + _execute_query( + config.offline_store, + f"DROP COLLECTION {table_name} IF EXISTS", + ) + + return CouchbaseColumnarRetrievalJob( + query=query_generator, + config=config, + full_feature_names=full_feature_names, + on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs( + feature_refs, project, registry + ), + metadata=RetrievalMetadata( + features=feature_refs, + keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}), + min_event_timestamp=entity_df_event_timestamp_range[0], + max_event_timestamp=entity_df_event_timestamp_range[1], + ), + timestamp_field=entity_df_event_timestamp_col, + ) + + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + """ + Fetch all rows from the specified table or query within the time range. + """ + assert isinstance(config.offline_store, CouchbaseColumnarOfflineStoreConfig) + assert isinstance(data_source, CouchbaseColumnarSource) + from_expression = data_source.get_table_query_string() + + field_string = ", ".join( + join_key_columns + feature_name_columns + [timestamp_field] + ) + + start_date_normalized = normalize_timestamp(start_date) + end_date_normalized = normalize_timestamp(end_date) + + query = f""" + SELECT {field_string} + FROM {from_expression} + WHERE `{timestamp_field}` BETWEEN '{start_date_normalized}' AND '{end_date_normalized}' + """ + + return CouchbaseColumnarRetrievalJob( + query=query, + config=config, + full_feature_names=False, + on_demand_feature_views=None, + timestamp_field=timestamp_field, + ) + + @staticmethod + def offline_write_batch( + config: RepoConfig, + feature_view: FeatureView, + table: pyarrow.Table, + progress: Optional[Callable[[int], Any]], + ): + raise NotImplementedError("Couchbase offline_write_batch not implemented yet.") + + +class CouchbaseColumnarRetrievalJob(RetrievalJob): + def __init__( + self, + query: Union[str, Callable[[], ContextManager[str]]], + config: RepoConfig, + full_feature_names: bool, + timestamp_field: str, + on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None, + metadata: Optional[RetrievalMetadata] = None, + ): + if not isinstance(query, str): + self._query_generator = query + else: + + @contextlib.contextmanager + def query_generator() -> Iterator[str]: + assert isinstance(query, str) + yield query + + self._query_generator = query_generator + self._config = config + self._full_feature_names = full_feature_names + self._on_demand_feature_views = on_demand_feature_views or [] + self._metadata = metadata + self._timestamp_field = timestamp_field + + @property + def full_feature_names(self) -> bool: + return self._full_feature_names + + @property + def on_demand_feature_views(self) -> List[OnDemandFeatureView]: + return self._on_demand_feature_views + + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: + # Use PyArrow to convert the result to a pandas DataFrame + return self._to_arrow_internal(timeout).to_pandas() + + def to_sql(self) -> str: + with self._query_generator() as query: + return query + + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: + with self._query_generator() as query: + res = _execute_query(self._config.offline_store, query) + rows = res.get_all_rows() + + processed_rows = [] + for row in rows: + processed_row = {} + for key, value in row.items(): + if key == self._timestamp_field and value is not None: + # Parse and ensure timezone-aware datetime + processed_row[key] = pd.to_datetime(value, utc=True) + else: + processed_row[key] = np.nan if value is None else value + processed_rows.append(processed_row) + + # Convert to PyArrow table + table = pa.Table.from_pylist(processed_rows) + return table + + @property + def metadata(self) -> Optional[RetrievalMetadata]: + return self._metadata + + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): + assert isinstance(storage, SavedDatasetCouchbaseColumnarStorage) + table_name = f"{storage.couchbase_options._database}.{storage.couchbase_options._scope}.{offline_utils.get_temp_entity_table_name()}" + _df_to_columnar(self.to_df(), table_name, self._config) + + +def _get_columnar_cluster(config: CouchbaseColumnarOfflineStoreConfig) -> Cluster: + assert config.connection_string is not None + assert config.user is not None + assert config.password is not None + + cred = Credential.from_username_and_password(config.user, config.password) + return Cluster.create_instance(config.connection_string, cred) + + +def _execute_query( + config: CouchbaseColumnarOfflineStoreConfig, + query: str, + named_params: Optional[Dict[str, Any]] = None, +) -> BlockingQueryResult: + cluster = _get_columnar_cluster(config) + return cluster.execute_query( + query, + QueryOptions( + named_parameters=named_params, timeout=timedelta(seconds=config.timeout) + ), + ) + + +def _df_to_columnar(df: pd.DataFrame, table_name: str, config: RepoConfig): + df_copy = df.copy() + insert_values = df_copy.apply( + lambda row: { + col: ( + normalize_timestamp(row[col], "%Y-%m-%dT%H:%M:%S.%f+00:00") + if isinstance(row[col], pd.Timestamp) + else row[col] + ) + for col in df_copy.columns + }, + axis=1, + ).tolist() + + create_collection_query = f"CREATE COLLECTION {table_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;" + insert_query = f"INSERT INTO {table_name} ({insert_values});" + + _execute_query(config.offline_store, create_collection_query) + _execute_query(config.offline_store, insert_query) + + +def _upload_entity_df( + config: RepoConfig, entity_df: Union[pd.DataFrame, str], table_name: str +): + if isinstance(entity_df, pd.DataFrame): + _df_to_columnar(entity_df, table_name, config) + elif isinstance(entity_df, str): + # If the entity_df is a string (SQL query), create a Columnar collection out of it + create_collection_query = f""" + CREATE COLLECTION {table_name} IF NOT EXISTS + PRIMARY KEY(pk: UUID) AUTOGENERATED + AS {entity_df} + """ + _execute_query(config.offline_store, create_collection_query) + else: + raise InvalidEntityType(type(entity_df)) + + +def _get_entity_df_event_timestamp_range( + entity_df: Union[pd.DataFrame, str], + entity_df_event_timestamp_col: str, + config: RepoConfig, +) -> Tuple[datetime, datetime]: + if isinstance(entity_df, pd.DataFrame): + entity_df_event_timestamp = entity_df.loc[ + :, entity_df_event_timestamp_col + ].infer_objects() + if pd.api.types.is_string_dtype(entity_df_event_timestamp): + entity_df_event_timestamp = pd.to_datetime( + entity_df_event_timestamp, utc=True + ) + entity_df_event_timestamp_range = ( + entity_df_event_timestamp.min().to_pydatetime(), + entity_df_event_timestamp.max().to_pydatetime(), + ) + + elif isinstance(entity_df, str): + query = f""" + SELECT + MIN({entity_df_event_timestamp_col}) AS min, + MAX({entity_df_event_timestamp_col}) AS max + FROM ({entity_df}) AS tmp_alias + """ + + res = _execute_query(config.offline_store, query) + rows = res.get_all_rows() + + if not rows: + raise ZeroRowsQueryResult(query) + + # Convert the string timestamps to datetime objects + min_ts = pd.to_datetime(rows[0]["min"], utc=True).to_pydatetime() + max_ts = pd.to_datetime(rows[0]["max"], utc=True).to_pydatetime() + entity_df_event_timestamp_range = (min_ts, max_ts) + else: + raise InvalidEntityType(type(entity_df)) + return entity_df_event_timestamp_range + + +def _escape_column(column: str) -> str: + """Wrap column names in backticks to handle reserved words.""" + return f"`{column}`" + + +def _append_alias(field_names: List[str], alias: str) -> List[str]: + """Append alias to escaped column names.""" + return [f"{alias}.{_escape_column(field_name)}" for field_name in field_names] + + +def build_point_in_time_query( + feature_view_query_contexts: List[dict], + left_table_query_string: str, + entity_df_event_timestamp_col: str, + entity_df_columns: KeysView[str], + query_template: str, + full_feature_names: bool = False, +) -> str: + """Build point-in-time query between each feature view table and the entity dataframe for Couchbase Columnar""" + template = Environment(loader=BaseLoader()).from_string(source=query_template) + final_output_feature_names = list(entity_df_columns) + final_output_feature_names.extend( + [ + ( + f'{fv["name"]}__{fv["field_mapping"].get(feature, feature)}' + if full_feature_names + else fv["field_mapping"].get(feature, feature) + ) + for fv in feature_view_query_contexts + for feature in fv["features"] + ] + ) + + # Add additional fields to dict + template_context = { + "left_table_query_string": left_table_query_string, + "entity_df_event_timestamp_col": entity_df_event_timestamp_col, + "unique_entity_keys": set( + [entity for fv in feature_view_query_contexts for entity in fv["entities"]] + ), + "featureviews": feature_view_query_contexts, + "full_feature_names": full_feature_names, + "final_output_feature_names": final_output_feature_names, + } + + query = template.render(template_context) + return query + + +def get_couchbase_query_schema(config, entity_df: str) -> Dict[str, np.dtype]: + df_query = f"({entity_df}) AS sub" + res = _execute_query(config.offline_store, f"SELECT sub.* FROM {df_query} LIMIT 1") + rows = res.get_all_rows() + + if rows and len(rows) > 0: + # Get the first row + first_row = rows[0] + # Create dictionary mapping each column to dtype('O') + return {key: np.dtype("O") for key in first_row.keys()} + + return {} + + +def _get_entity_schema( + entity_df: Union[pd.DataFrame, str], + config: RepoConfig, +) -> Dict[str, np.dtype]: + if isinstance(entity_df, pd.DataFrame): + return dict(zip(entity_df.columns, entity_df.dtypes)) + + elif isinstance(entity_df, str): + return get_couchbase_query_schema(config, entity_df) + else: + raise InvalidEntityType(type(entity_df)) + + +MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """ +WITH entity_dataframe AS ( + SELECT e.*, + e.`{{entity_df_event_timestamp_col}}` AS entity_timestamp + {% for featureview in featureviews -%} + {% if featureview.entities -%} + ,CONCAT( + {% for entity in featureview.entities -%} + TOSTRING(e.`{{entity}}`), + {% endfor -%} + TOSTRING(e.`{{entity_df_event_timestamp_col}}`) + ) AS `{{featureview.name}}__entity_row_unique_id` + {% else -%} + ,TOSTRING(e.`{{entity_df_event_timestamp_col}}`) AS `{{featureview.name}}__entity_row_unique_id` + {% endif -%} + {% endfor %} + FROM {{ left_table_query_string }} e +), + +{% for featureview in featureviews %} + +`{{ featureview.name }}__entity_dataframe` AS ( + SELECT + {% if featureview.entities %}`{{ featureview.entities | join('`, `') }}`,{% endif %} + entity_timestamp, + `{{featureview.name}}__entity_row_unique_id` + FROM entity_dataframe + GROUP BY + {% if featureview.entities %}`{{ featureview.entities | join('`, `')}}`,{% endif %} + entity_timestamp, + `{{featureview.name}}__entity_row_unique_id` +), + +/* + This query template performs the point-in-time correctness join for a single feature set table + to the provided entity table. + + 1. We first join the current feature_view to the entity dataframe that has been passed. + This JOIN has the following logic: + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` + is less than the one provided in the entity dataframe + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` + is higher the the one provided minus the TTL + - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been + computed previously + + The output of this CTE will contain all the necessary information and already filtered out most + of the data that is not relevant. +*/ +`{{ featureview.name }}__subquery` AS ( + LET max_ts = (SELECT RAW MAX(entity_timestamp) FROM entity_dataframe)[0] + SELECT s.* FROM ( + LET min_ts = (SELECT RAW MIN(entity_timestamp) FROM entity_dataframe)[0] + SELECT + `{{ featureview.timestamp_field }}` as event_timestamp, + {{ '`' ~ featureview.created_timestamp_column ~ '` as created_timestamp,' if featureview.created_timestamp_column else '' }} + {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} + {% for feature in featureview.features -%} + `{{ feature }}` as {% if full_feature_names %}`{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}`{% else %}`{{ featureview.field_mapping.get(feature, feature) }}`{% endif %}{% if not loop.last %}, {% endif %} + {%- endfor %} + FROM {{ featureview.table_subquery }} AS sub + WHERE `{{ featureview.timestamp_field }}` <= max_ts + {% if featureview.ttl == 0 %}{% else %} + AND date_diff_str(min_ts, `{{ featureview.timestamp_field }}`, "second") <= {{ featureview.ttl }} + {% endif %} + ) s +), + +`{{ featureview.name }}__base` AS ( + SELECT + subquery.*, + entity_dataframe.entity_timestamp, + entity_dataframe.`{{featureview.name}}__entity_row_unique_id` + FROM `{{ featureview.name }}__subquery` AS subquery + INNER JOIN `{{ featureview.name }}__entity_dataframe` AS entity_dataframe + ON TRUE + AND subquery.event_timestamp <= entity_dataframe.entity_timestamp + {% if featureview.ttl == 0 %}{% else %} + AND date_diff_str(entity_dataframe.entity_timestamp, subquery.event_timestamp, "second") <= {{ featureview.ttl }} + {% endif %} + {% for entity in featureview.entities %} + AND subquery.`{{ entity }}` = entity_dataframe.`{{ entity }}` + {% endfor %} +), + +/* + 2. If the `created_timestamp_column` has been set, we need to + deduplicate the data first. This is done by calculating the + `MAX(created_at_timestamp)` for each event_timestamp. + We then join the data on the next CTE +*/ +{% if featureview.created_timestamp_column %} +`{{ featureview.name }}__dedup` AS ( + SELECT + `{{featureview.name}}__entity_row_unique_id`, + event_timestamp, + MAX(created_timestamp) AS created_timestamp + FROM `{{ featureview.name }}__base` + GROUP BY `{{featureview.name}}__entity_row_unique_id`, event_timestamp +), +{% endif %} + +/* + 3. The data has been filtered during the first CTE "*__base" + Thus we only need to compute the latest timestamp of each feature. +*/ +`{{ featureview.name }}__latest` AS ( + SELECT + event_timestamp + {% if featureview.created_timestamp_column %},created_timestamp{% endif %}, + `{{featureview.name}}__entity_row_unique_id` + FROM ( + SELECT base.*, + ROW_NUMBER() OVER( + PARTITION BY base.`{{featureview.name}}__entity_row_unique_id` + ORDER BY base.event_timestamp DESC + {% if featureview.created_timestamp_column %}, base.created_timestamp DESC{% endif %} + ) AS row_number + FROM `{{ featureview.name }}__base` base + {% if featureview.created_timestamp_column %} + INNER JOIN `{{ featureview.name }}__dedup` dedup + ON base.`{{featureview.name}}__entity_row_unique_id` = dedup.`{{featureview.name}}__entity_row_unique_id` + AND base.event_timestamp = dedup.event_timestamp + AND base.created_timestamp = dedup.created_timestamp + {% endif %} + ) AS sub + WHERE sub.row_number = 1 +), + +/* + 4. Once we know the latest value of each feature for a given timestamp, + we can join again the data back to the original "base" dataset +*/ +`{{ featureview.name }}__cleaned` AS ( + SELECT base.* + FROM `{{ featureview.name }}__base` AS base + INNER JOIN `{{ featureview.name }}__latest` AS latest + ON base.`{{featureview.name}}__entity_row_unique_id` = latest.`{{featureview.name}}__entity_row_unique_id` + AND base.event_timestamp = latest.event_timestamp + {% if featureview.created_timestamp_column %} + AND base.created_timestamp = latest.created_timestamp + {% endif %} +){% if not loop.last %},{% endif %} + +{% endfor %} + +/* + Joins the outputs of multiple time travel joins to a single table. + The entity_dataframe dataset being our source of truth here. + */ +SELECT DISTINCT + {%- set fields = [] %} + {%- for feature_name in final_output_feature_names %} + {%- if '__' not in feature_name %} + {%- set ns = namespace(found=false) %} + {%- for fv in featureviews %} + {%- for feature in fv.features %} + {%- if feature == feature_name %} + {%- set ns.found = true %} + {%- if full_feature_names %} + {%- set _ = fields.append('IFMISSINGORNULL(`' ~ fv.name ~ '_final`.`' ~ fv.name ~ '__' ~ feature ~ '`, null) AS `' ~ fv.name ~ '__' ~ feature ~ '`') %} + {%- else %} + {%- set _ = fields.append('IFMISSINGORNULL(`' ~ fv.name ~ '_final`.`' ~ feature ~ '`, null) AS `' ~ feature ~ '`') %} + {%- endif %} + {%- endif %} + {%- endfor %} + {%- endfor %} + {%- if not ns.found %} + {%- if feature_name == 'feature_name' %} + {%- set _ = fields.append('IFMISSINGORNULL(`field_mapping_final`.`' ~ feature_name ~ '`, null) AS `' ~ feature_name ~ '`') %} + {%- else %} + {%- set _ = fields.append('main_entity.`' ~ feature_name ~ '`') %} + {%- endif %} + {%- endif %} + {%- else %} + {%- set feature_parts = feature_name.split('__') %} + {%- set fv_name = feature_parts[0] %} + {%- set feature = feature_parts[1] %} + {%- if feature_name == 'field_mapping__feature_name' %} + {%- set _ = fields.append('IFMISSINGORNULL(`field_mapping_final`.`field_mapping__feature_name`, null) AS `field_mapping__feature_name`') %} + {%- else %} + {%- set _ = fields.append('IFMISSINGORNULL(`' ~ fv_name ~ '_final`.`' ~ feature_name ~ '`, null) AS `' ~ feature_name ~ '`') %} + {%- endif %} + {%- endif %} + {%- endfor %} + {{ fields | reject('none') | join(',\n ') }} +FROM entity_dataframe AS main_entity + +{%- for featureview in featureviews %} +LEFT JOIN ( + SELECT + `{{featureview.name}}__entity_row_unique_id`, + {% for feature in featureview.features -%} + IFMISSINGORNULL(`{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}`, null) AS `{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}`{% if not loop.last %},{% endif %} + {% endfor %} + FROM `{{ featureview.name }}__cleaned` +) AS `{{featureview.name}}_final` +ON main_entity.`{{featureview.name}}__entity_row_unique_id` = `{{featureview.name}}_final`.`{{featureview.name}}__entity_row_unique_id` +{% endfor %} +""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py new file mode 100644 index 0000000000..46ddfba19a --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py @@ -0,0 +1,399 @@ +import json +from datetime import timedelta +from typing import Any, Callable, Dict, Iterable, Optional, Tuple + +from couchbase_columnar.cluster import Cluster +from couchbase_columnar.credential import Credential +from couchbase_columnar.options import QueryOptions +from typeguard import typechecked + +from feast.data_source import DataSource +from feast.errors import DataSourceNoNameException, ZeroColumnQueryResult +from feast.feature_logging import LoggingDestination +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) +from feast.protos.feast.core.SavedDataset_pb2 import ( + SavedDatasetStorage as SavedDatasetStorageProto, +) +from feast.repo_config import RepoConfig +from feast.saved_dataset import SavedDatasetStorage +from feast.type_map import ValueType, cb_columnar_type_to_feast_value_type + + +@typechecked +class CouchbaseColumnarSource(DataSource): + """A CouchbaseColumnarSource object defines a data source that a CouchbaseColumnarOfflineStore class can use.""" + + def __init__( + self, + name: Optional[str] = None, + query: Optional[str] = None, + database: Optional[str] = "Default", + scope: Optional[str] = "Default", + collection: Optional[str] = None, + timestamp_field: Optional[str] = "", + created_timestamp_column: Optional[str] = "", + field_mapping: Optional[Dict[str, str]] = None, + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", + ): + """Creates a CouchbaseColumnarSource object. + + Args: + name: Name of CouchbaseColumnarSource, which should be unique within a project. + query: SQL++ query that will be used to fetch the data. + database: Columnar database name. + scope: Columnar scope name. + collection: Columnar collection name. + timestamp_field (optional): Event timestamp field used for point-in-time joins of + feature values. + created_timestamp_column (optional): Timestamp column indicating when the row + was created, used for deduplicating rows. + field_mapping (optional): A dictionary mapping of field names in this data + source to feature names in a feature table or view. Only used for feature + fields, not entity or timestamp fields. + description (optional): A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the data source, typically the email of the primary + maintainer. + """ + self._couchbase_options = CouchbaseColumnarOptions( + name=name, + query=query, + database=database, + scope=scope, + collection=collection, + ) + + # If no name, use the collection as the default name. + if name is None and collection is None: + raise DataSourceNoNameException() + name = name or collection + assert name + + super().__init__( + name=name, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping, + description=description, + tags=tags, + owner=owner, + ) + + def __hash__(self): + return super().__hash__() + + def __eq__(self, other): + if not isinstance(other, CouchbaseColumnarSource): + raise TypeError( + "Comparisons should only involve CouchbaseColumnarSource class objects." + ) + + return ( + super().__eq__(other) + and self._couchbase_options._query == other._couchbase_options._query + and self.timestamp_field == other.timestamp_field + and self.created_timestamp_column == other.created_timestamp_column + and self.field_mapping == other.field_mapping + ) + + @staticmethod + def from_proto(data_source: DataSourceProto): + assert data_source.HasField("custom_options") + + couchbase_options = json.loads(data_source.custom_options.configuration) + + return CouchbaseColumnarSource( + name=couchbase_options["name"], + query=couchbase_options["query"], + database=couchbase_options["database"], + scope=couchbase_options["scope"], + collection=couchbase_options["collection"], + field_mapping=dict(data_source.field_mapping), + timestamp_field=data_source.timestamp_field, + created_timestamp_column=data_source.created_timestamp_column, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, + ) + + def to_proto(self) -> DataSourceProto: + data_source_proto = DataSourceProto( + name=self.name, + type=DataSourceProto.CUSTOM_SOURCE, + data_source_class_type="feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase_source.CouchbaseColumnarSource", + field_mapping=self.field_mapping, + custom_options=self._couchbase_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, + ) + + data_source_proto.timestamp_field = self.timestamp_field + data_source_proto.created_timestamp_column = self.created_timestamp_column + + return data_source_proto + + def validate(self, config: RepoConfig): + pass + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + # Define the type conversion for Couchbase fields to Feast ValueType as needed + return cb_columnar_type_to_feast_value_type + + def _infer_composite_type(self, field: Dict[str, Any]) -> str: + """ + Infers type signature for a field, rejecting complex nested structures that + aren't compatible with Feast's type system. + + Args: + field: Dictionary containing field information including type and nested structures + + Returns: + String representation of the type, or raises ValueError for incompatible types + + Raises: + ValueError: If field contains complex nested structures not supported by Feast + """ + base_type = field.get("field-type", "unknown").lower() + + if base_type == "array": + if "list" not in field or not field["list"]: + return "array" + + item_type = field["list"][0] + if item_type.get("field-type") == "object": + raise ValueError( + "Complex object types in arrays are not supported by Feast. " + "Arrays must contain homogeneous primitive values." + ) + + # Only allow arrays of primitive types + inner_type = item_type.get("field-type", "unknown") + if inner_type in ["array", "multiset", "object"]: + raise ValueError( + "Nested collection types are not supported by Feast. " + "Arrays can only be one level deep." + ) + + return f"array<{inner_type}>" + + elif base_type == "object": + raise ValueError( + "Complex object types are not supported by Feast. " + "Only primitive types and homogeneous arrays are allowed." + ) + + elif base_type == "multiset": + raise ValueError( + "Multiset types are not supported by Feast. " + "Only primitive types and homogeneous arrays are allowed." + ) + + return base_type + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + cred = Credential.from_username_and_password( + config.offline_store.user, config.offline_store.password + ) + cluster = Cluster.create_instance(config.offline_store.connection_string, cred) + + query_context = self.get_table_query_string() + query = f""" + SELECT get_object_fields( + CASE WHEN ARRAY_LENGTH(OBJECT_PAIRS(t)) = 1 AND OBJECT_PAIRS(t)[0].`value` IS NOT MISSING + THEN OBJECT_PAIRS(t)[0].`value` + ELSE t + END + ) AS field_types + FROM {query_context} AS t + LIMIT 1; + """ + + result = cluster.execute_query( + query, QueryOptions(timeout=timedelta(seconds=500)) + ) + if not result: + raise ZeroColumnQueryResult(query) + + rows = result.get_all_rows() + field_type_pairs = [] + if rows and rows[0]: + # Accessing the "field_types" array from the first row + field_types_list = rows[0].get("field_types", []) + for field in field_types_list: + field_name = field.get("field-name", "unknown") + # if field_name == "pk": + # continue + field_type = self._infer_composite_type(field) + field_type_pairs.append((field_name, field_type)) + return field_type_pairs + + def get_table_query_string(self) -> str: + if ( + self._couchbase_options._database + and self._couchbase_options._scope + and self._couchbase_options._collection + ): + return f"`{self._couchbase_options._database}`.`{self._couchbase_options._scope}`.`{self._couchbase_options._collection}`" + else: + return f"({self._couchbase_options._query})" + + @property + def database(self) -> str: + """Returns the database name.""" + return self._couchbase_options._database + + @property + def scope(self) -> str: + """Returns the scope name.""" + return self._couchbase_options._scope + + +class CouchbaseColumnarOptions: + def __init__( + self, + name: Optional[str], + query: Optional[str], + database: Optional[str], + scope: Optional[str], + collection: Optional[str], + ): + self._name = name or "" + self._query = query or "" + self._database = database or "" + self._scope = scope or "" + self._collection = collection or "" + + @classmethod + def from_proto(cls, couchbase_options_proto: DataSourceProto.CustomSourceOptions): + config = json.loads(couchbase_options_proto.configuration.decode("utf8")) + couchbase_options = cls( + name=config["name"], + query=config["query"], + database=config["database"], + scope=config["scope"], + collection=config["collection"], + ) + + return couchbase_options + + def to_proto(self) -> DataSourceProto.CustomSourceOptions: + couchbase_options_proto = DataSourceProto.CustomSourceOptions( + configuration=json.dumps( + { + "name": self._name, + "query": self._query, + "database": self._database, + "scope": self._scope, + "collection": self._collection, + } + ).encode() + ) + return couchbase_options_proto + + +class SavedDatasetCouchbaseColumnarStorage(SavedDatasetStorage): + _proto_attr_name = "custom_storage" + + couchbase_options: CouchbaseColumnarOptions + + def __init__(self, database_ref: str, scope_ref: str, collection_ref: str): + self.couchbase_options = CouchbaseColumnarOptions( + database=database_ref, + scope=scope_ref, + collection=collection_ref, + name=None, + query=None, + ) + + @staticmethod + def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage: + return SavedDatasetCouchbaseColumnarStorage( + database_ref=CouchbaseColumnarOptions.from_proto( + storage_proto.custom_storage + )._database, + scope_ref=CouchbaseColumnarOptions.from_proto( + storage_proto.custom_storage + )._scope, + collection_ref=CouchbaseColumnarOptions.from_proto( + storage_proto.custom_storage + )._collection, + ) + + def to_proto(self) -> SavedDatasetStorageProto: + return SavedDatasetStorageProto( + custom_storage=self.couchbase_options.to_proto() + ) + + def to_data_source(self) -> DataSource: + return CouchbaseColumnarSource( + database=self.couchbase_options._database, + scope=self.couchbase_options._scope, + collection=self.couchbase_options._collection, + ) + + +class CouchbaseColumnarLoggingDestination(LoggingDestination): + """ + Couchbase Columnar implementation of a logging destination. + """ + + database: str + scope: str + table_name: str + + _proto_kind = "couchbase_columnar_destination" + + def __init__(self, *, database: str, scope: str, table_name: str): + """ + Args: + database: The Couchbase database name + scope: The Couchbase scope name + table_name: The Couchbase collection name to log features into + """ + self.database = database + self.scope = scope + self.table_name = table_name + + def to_data_source(self) -> DataSource: + """ + Returns a data source object representing the logging destination. + """ + return CouchbaseColumnarSource( + database=self.database, + scope=self.scope, + collection=self.table_name, + ) + + def to_proto(self) -> LoggingConfigProto: + """ + Converts the logging destination to its protobuf representation. + """ + return LoggingConfigProto( + couchbase_columnar_destination=LoggingConfigProto.CouchbaseColumnarDestination( + database=self.database, + scope=self.scope, + collection=self.table_name, + ) + ) + + @classmethod + def from_proto( + cls, config_proto: LoggingConfigProto + ) -> "CouchbaseColumnarLoggingDestination": + """ + Creates a CouchbaseColumnarLoggingDestination from its protobuf representation. + """ + return CouchbaseColumnarLoggingDestination( + database=config_proto.CouchbaseColumnarDestination.database, + scope=config_proto.CouchbaseColumnarDestination.scope, + table_name=config_proto.CouchbaseColumnarDestination.collection, + ) diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/__init__.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py new file mode 100644 index 0000000000..02c806c866 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py @@ -0,0 +1,145 @@ +import json +import os +import uuid +from datetime import timedelta +from typing import Dict, List, Optional + +import pandas as pd +from couchbase_columnar.cluster import Cluster +from couchbase_columnar.credential import Credential +from couchbase_columnar.options import QueryOptions + +from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination +from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase import ( + CouchbaseColumnarOfflineStoreConfig, +) +from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase_source import ( + CouchbaseColumnarLoggingDestination, + CouchbaseColumnarSource, +) +from feast.infra.utils.couchbase.couchbase_utils import normalize_timestamp +from feast.repo_config import FeastConfigBaseModel +from tests.integration.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) + +COUCHBASE_COLUMNAR_DATABASE = "Default" +COUCHBASE_COLUMNAR_SCOPE = "Default" + + +class CouchbaseColumnarDataSourceCreator(DataSourceCreator): + collections: List[str] = [] + + def __init__(self, project_name: str, *args, **kwargs): + super().__init__(project_name) + self.offline_store_config = CouchbaseColumnarOfflineStoreConfig( + type="couchbase", + connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"], + user=os.environ["COUCHBASE_COLUMNAR_USER"], + password=os.environ["COUCHBASE_COLUMNAR_PASSWORD"], + timeout=480, + ) + + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + created_timestamp_column="created_ts", + field_mapping: Optional[Dict[str, str]] = None, + timestamp_field: Optional[str] = "ts", + ) -> DataSource: + def format_row(row): + """Convert row to dictionary, handling NaN and timestamps""" + return { + col: ( + normalize_timestamp(row[col]) + if isinstance(row[col], pd.Timestamp) + else None + if pd.isna(row[col]) + else row[col] + ) + for col in row.index + } + + collection_name = self.get_prefixed_collection_name(destination_name) + + cred = Credential.from_username_and_password( + self.offline_store_config.user, self.offline_store_config.password + ) + cluster = Cluster.create_instance( + self.offline_store_config.connection_string, cred + ) + + create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;" + cluster.execute_query( + create_cluster_query, + QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)), + ) + + values_list = df.apply(format_row, axis=1).apply(json.dumps).tolist() + values_clause = ",\n ".join(values_list) + + insert_query = f""" + INSERT INTO `{COUCHBASE_COLUMNAR_DATABASE}`.`{COUCHBASE_COLUMNAR_SCOPE}`.`{collection_name}` ([ + {values_clause} + ]) + """ + cluster.execute_query( + insert_query, + QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)), + ) + + self.collections.append(collection_name) + + return CouchbaseColumnarSource( + name=collection_name, + query=f"SELECT VALUE v FROM {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.`{collection_name}` v", + database=COUCHBASE_COLUMNAR_DATABASE, + scope=COUCHBASE_COLUMNAR_SCOPE, + collection=collection_name, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping or {"ts_1": "ts"}, + ) + + def create_saved_dataset_destination(self): + raise NotImplementedError + + def create_logged_features_destination(self) -> LoggingDestination: + collection = self.get_prefixed_collection_name( + f"logged_features_{str(uuid.uuid4()).replace('-', '_')}" + ) + self.collections.append(collection) + return CouchbaseColumnarLoggingDestination( + table_name=collection, + database=COUCHBASE_COLUMNAR_DATABASE, + scope=COUCHBASE_COLUMNAR_SCOPE, + ) + + def create_offline_store_config(self) -> FeastConfigBaseModel: + return self.offline_store_config + + def get_prefixed_collection_name(self, suffix: str) -> str: + return f"{self.project_name}_{suffix}" + + def teardown(self): + cred = Credential.from_username_and_password( + self.offline_store_config.user, self.offline_store_config.password + ) + cluster = Cluster.create_instance( + self.offline_store_config.connection_string, cred + ) + + for collection in self.collections: + query = f"DROP COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.`{collection}` IF EXISTS;" + try: + cluster.execute_query( + query, + QueryOptions( + timeout=timedelta(seconds=self.offline_store_config.timeout) + ), + ) + print(f"Successfully dropped collection: {collection}") + except Exception as e: + print(f"Error dropping collection {collection}: {e}") diff --git a/sdk/python/feast/infra/utils/couchbase/__init__.py b/sdk/python/feast/infra/utils/couchbase/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/utils/couchbase/couchbase_utils.py b/sdk/python/feast/infra/utils/couchbase/couchbase_utils.py new file mode 100644 index 0000000000..005729274e --- /dev/null +++ b/sdk/python/feast/infra/utils/couchbase/couchbase_utils.py @@ -0,0 +1,13 @@ +from datetime import datetime, timezone + + +def normalize_timestamp( + dt: datetime, target_format: str = "%Y-%m-%dT%H:%M:%S%z" +) -> str: + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) # Assume UTC for naive datetimes + # Convert to UTC + utc_dt = dt.astimezone(timezone.utc) + # Format with strftime + formatted = utc_dt.strftime(target_format) + return formatted diff --git a/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi b/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi index 0b1c0baa87..0652ce0216 100644 --- a/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi @@ -196,6 +196,26 @@ class LoggingConfig(google.protobuf.message.Message): table_name: builtins.str = ..., ) -> None: ... def ClearField(self, field_name: typing_extensions.Literal["table_name", b"table_name"]) -> None: ... + class CouchbaseColumnarDestination(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + DATABASE_FIELD_NUMBER: builtins.int + SCOPE_FIELD_NUMBER: builtins.int + COLLECTION_FIELD_NUMBER: builtins.int + database: builtins.str + """Destination database name""" + scope: builtins.str + """Destination scope name""" + collection: builtins.str + """Destination collection name""" + def __init__( + self, + *, + database: builtins.str = ..., + scope: builtins.str = ..., + collection: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["database", b"database", "scope", b"scope", "collection", b"collection"]) -> None: ... class CustomDestination(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -258,6 +278,7 @@ class LoggingConfig(google.protobuf.message.Message): snowflake_destination: global___LoggingConfig.SnowflakeDestination | None = ..., custom_destination: global___LoggingConfig.CustomDestination | None = ..., athena_destination: global___LoggingConfig.AthenaDestination | None = ..., + couchbase_columnar_destination: global___LoggingConfig.CouchbaseColumnarDestination | None = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["athena_destination", b"athena_destination", "bigquery_destination", b"bigquery_destination", "custom_destination", b"custom_destination", "destination", b"destination", "file_destination", b"file_destination", "redshift_destination", b"redshift_destination", "snowflake_destination", b"snowflake_destination"]) -> builtins.bool: ... def ClearField(self, field_name: typing_extensions.Literal["athena_destination", b"athena_destination", "bigquery_destination", b"bigquery_destination", "custom_destination", b"custom_destination", "destination", b"destination", "file_destination", b"file_destination", "redshift_destination", b"redshift_destination", "sample_rate", b"sample_rate", "snowflake_destination", b"snowflake_destination"]) -> None: ... diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index d943caa4c1..740acfad59 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -98,6 +98,7 @@ "mssql": "feast.infra.offline_stores.contrib.mssql_offline_store.mssql.MsSqlServerOfflineStore", "duckdb": "feast.infra.offline_stores.duckdb.DuckDBOfflineStore", "remote": "feast.infra.offline_stores.remote.RemoteOfflineStore", + "couchbase": "feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase.CouchbaseColumnarOfflineStore", } FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 8e3941b05b..526a5346e0 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -1077,3 +1077,32 @@ def pa_to_athena_value_type(pa_type: "pyarrow.DataType") -> str: } return type_map[pa_type_as_str] + + +def cb_columnar_type_to_feast_value_type(type_str: str) -> ValueType: + """ + Convert a Couchbase Columnar type string to a Feast ValueType + """ + type_map: Dict[str, ValueType] = { + # primitive types + "boolean": ValueType.BOOL, + "string": ValueType.STRING, + "bigint": ValueType.INT64, + "double": ValueType.DOUBLE, + # special types + "null": ValueType.NULL, + "missing": ValueType.UNKNOWN, + # composite types + "object": ValueType.UNKNOWN, + "array": ValueType.UNKNOWN, + "multiset": ValueType.UNKNOWN, + "uuid": ValueType.STRING, + } + value = ( + type_map[type_str.lower()] + if type_str.lower() in type_map + else ValueType.UNKNOWN + ) + if value == ValueType.UNKNOWN: + print("unknown type:", type_str) + return value From d48be3b20a16a71f39bce1eace0b3284b2f7f925 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Mon, 3 Feb 2025 14:05:04 -0800 Subject: [PATCH 03/15] Testing Config Signed-off-by: Elliot Scribner --- Makefile | 27 +++++++++++++++++++ .../couchbase_columnar_repo_configuration.py | 20 ++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 sdk/python/feast/infra/offline_stores/contrib/couchbase_columnar_repo_configuration.py diff --git a/Makefile b/Makefile index c199eb3a5e..21e7e36420 100644 --- a/Makefile +++ b/Makefile @@ -402,6 +402,33 @@ test-python-universal-qdrant-online: -k "test_retrieve_online_documents" \ sdk/python/tests/integration/online_store/test_universal_online.py +# To use Couchbase as an offline store, you need to create an Couchbase Capella Columnar cluster on cloud.couchbase.com. +# Modify environment variables COUCHBASE_COLUMNAR_CONNECTION_STRING, COUCHBASE_COLUMNAR_USER, and COUCHBASE_COLUMNAR_PASSWORD +# with the details from your Couchbase Columnar Cluster. +test-python-universal-couchbase-offline: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.couchbase_columnar_repo_configuration \ + PYTEST_PLUGINS=feast.infra.offline_stores.contrib.couchbase_offline_store.tests \ + COUCHBASE_COLUMNAR_CONNECTION_STRING=couchbases:// \ + COUCHBASE_COLUMNAR_USER=username \ + COUCHBASE_COLUMNAR_PASSWORD=password \ + python -m pytest -n 8 --integration \ + -k "not test_historical_retrieval_with_validation and \ + not test_historical_features_persisting and \ + not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_reorder_columns and \ + not test_logged_features_validation and \ + not test_lambda_materialization_consistency and \ + not test_offline_write and \ + not test_push_features_to_offline_store and \ + not gcs_registry and \ + not s3_registry and \ + not test_snowflake and \ + not test_universal_types" \ + sdk/python/tests + test-python-universal-couchbase-online: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.couchbase_repo_configuration \ diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_columnar_repo_configuration.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_columnar_repo_configuration.py new file mode 100644 index 0000000000..745a074a75 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_columnar_repo_configuration.py @@ -0,0 +1,20 @@ +from feast.infra.offline_stores.contrib.couchbase_offline_store.tests.data_source import ( + CouchbaseColumnarDataSourceCreator, +) +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.repo_configuration import REDIS_CONFIG +from tests.integration.feature_repos.universal.online_store.redis import ( + RedisOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig( + provider="aws", + offline_store_creator=CouchbaseColumnarDataSourceCreator, + ), +] + +AVAILABLE_OFFLINE_STORES = [("aws", CouchbaseColumnarDataSourceCreator)] +AVAILABLE_ONLINE_STORES = {"redis": (REDIS_CONFIG, RedisOnlineStoreCreator)} From e0cf6ded578deb793a24096f5e1872c4cdef54d2 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Wed, 5 Feb 2025 11:16:31 -0800 Subject: [PATCH 04/15] Warnings for Experimental Store Signed-off-by: Elliot Scribner --- .../couchbase_offline_store/couchbase.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index 2c8198435d..7a369c32a9 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -1,4 +1,5 @@ import contextlib +import warnings from dataclasses import asdict from datetime import datetime, timedelta from typing import ( @@ -47,6 +48,9 @@ SavedDatasetCouchbaseColumnarStorage, ) +# Only prints out runtime warnings once. +warnings.simplefilter("once", RuntimeWarning) + class CouchbaseColumnarOfflineStoreConfig(FeastConfigBaseModel): """Offline store config for Couchbase Columnar""" @@ -74,6 +78,11 @@ def pull_latest_from_table_or_query( """ Fetch the latest rows for each join key. """ + warnings.warn( + "This offline store is an experimental feature in alpha development. " + "Some functionality may still be unstable so functionality can change in the future.", + RuntimeWarning, + ) assert isinstance(config.offline_store, CouchbaseColumnarOfflineStoreConfig) assert isinstance(data_source, CouchbaseColumnarSource) from_expression = data_source.get_table_query_string() @@ -131,6 +140,11 @@ def get_historical_features( """ Retrieve historical features using point-in-time joins. """ + warnings.warn( + "This offline store is an experimental feature in alpha development. " + "Some functionality may still be unstable so functionality can change in the future.", + RuntimeWarning, + ) assert isinstance(config.offline_store, CouchbaseColumnarOfflineStoreConfig) for fv in feature_views: assert isinstance(fv.batch_source, CouchbaseColumnarSource) @@ -221,6 +235,11 @@ def pull_all_from_table_or_query( """ Fetch all rows from the specified table or query within the time range. """ + warnings.warn( + "This offline store is an experimental feature in alpha development. " + "Some functionality may still be unstable so functionality can change in the future.", + RuntimeWarning, + ) assert isinstance(config.offline_store, CouchbaseColumnarOfflineStoreConfig) assert isinstance(data_source, CouchbaseColumnarSource) from_expression = data_source.get_table_query_string() From 047a67273f8e03cc0f7439eacf68edae97b16275 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Wed, 5 Feb 2025 11:41:21 -0800 Subject: [PATCH 05/15] Initial Template Signed-off-by: Elliot Scribner --- .../couchbase_offline_store/couchbase.py | 20 +-- .../couchbase_source.py | 6 +- .../feast/templates/couchbase/bootstrap.py | 106 ++++++++++++++ .../couchbase/feature_repo/example_repo.py | 134 ++++++++++++++++++ .../couchbase/feature_repo/feature_store.yaml | 8 +- .../couchbase/feature_repo/test_workflow.py | 112 +++++++++++++++ 6 files changed, 368 insertions(+), 18 deletions(-) create mode 100644 sdk/python/feast/templates/couchbase/bootstrap.py create mode 100644 sdk/python/feast/templates/couchbase/feature_repo/example_repo.py create mode 100644 sdk/python/feast/templates/couchbase/feature_repo/test_workflow.py diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index 7a369c32a9..52e2c30f48 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -19,7 +19,6 @@ import numpy as np import pandas as pd -import pyarrow import pyarrow as pa from couchbase_columnar.cluster import Cluster from couchbase_columnar.common.result import BlockingQueryResult @@ -265,15 +264,6 @@ def pull_all_from_table_or_query( timestamp_field=timestamp_field, ) - @staticmethod - def offline_write_batch( - config: RepoConfig, - feature_view: FeatureView, - table: pyarrow.Table, - progress: Optional[Callable[[int], Any]], - ): - raise NotImplementedError("Couchbase offline_write_batch not implemented yet.") - class CouchbaseColumnarRetrievalJob(RetrievalJob): def __init__( @@ -349,7 +339,7 @@ def persist( ): assert isinstance(storage, SavedDatasetCouchbaseColumnarStorage) table_name = f"{storage.couchbase_options._database}.{storage.couchbase_options._scope}.{offline_utils.get_temp_entity_table_name()}" - _df_to_columnar(self.to_df(), table_name, self._config) + df_to_columnar(self.to_df(), table_name, self._config.offline_store) def _get_columnar_cluster(config: CouchbaseColumnarOfflineStoreConfig) -> Cluster: @@ -375,7 +365,7 @@ def _execute_query( ) -def _df_to_columnar(df: pd.DataFrame, table_name: str, config: RepoConfig): +def df_to_columnar(df: pd.DataFrame, table_name: str, offline_store: CouchbaseColumnarOfflineStoreConfig): df_copy = df.copy() insert_values = df_copy.apply( lambda row: { @@ -392,15 +382,15 @@ def _df_to_columnar(df: pd.DataFrame, table_name: str, config: RepoConfig): create_collection_query = f"CREATE COLLECTION {table_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;" insert_query = f"INSERT INTO {table_name} ({insert_values});" - _execute_query(config.offline_store, create_collection_query) - _execute_query(config.offline_store, insert_query) + _execute_query(offline_store, create_collection_query) + _execute_query(offline_store, insert_query) def _upload_entity_df( config: RepoConfig, entity_df: Union[pd.DataFrame, str], table_name: str ): if isinstance(entity_df, pd.DataFrame): - _df_to_columnar(entity_df, table_name, config) + df_to_columnar(entity_df, table_name, config.offline_store) elif isinstance(entity_df, str): # If the entity_df is a string (SQL query), create a Columnar collection out of it create_collection_query = f""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py index 46ddfba19a..a28f853da1 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py @@ -230,8 +230,10 @@ def get_table_column_names_and_types( field_types_list = rows[0].get("field_types", []) for field in field_types_list: field_name = field.get("field-name", "unknown") - # if field_name == "pk": - # continue + field_type = field.get("field-type", "unknown") + # drop uuid fields to ensure schema matches dataframe + if field_type == "uuid": + continue field_type = self._infer_composite_type(field) field_type_pairs.append((field_name, field_type)) return field_type_pairs diff --git a/sdk/python/feast/templates/couchbase/bootstrap.py b/sdk/python/feast/templates/couchbase/bootstrap.py new file mode 100644 index 0000000000..a638db426e --- /dev/null +++ b/sdk/python/feast/templates/couchbase/bootstrap.py @@ -0,0 +1,106 @@ +import click +from couchbase_columnar.cluster import Cluster +from couchbase_columnar.common.errors import InvalidCredentialError, TimeoutError +from couchbase_columnar.common.options import QueryOptions +from couchbase_columnar.credential import Credential + +from feast.file_utils import replace_str_in_file +from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase import ( + CouchbaseColumnarOfflineStoreConfig, + df_to_columnar, +) + + +def bootstrap(): + # Bootstrap() will automatically be called from the init_repo() during `feast init` + + import pathlib + from datetime import datetime, timedelta + + from feast.driver_test_data import create_driver_hourly_stats_df + + repo_path = pathlib.Path(__file__).parent.absolute() / "feature_repo" + config_file = repo_path / "feature_store.yaml" + + if click.confirm("Configure Couchbase Online Store?", default=True): + connection_string = click.prompt( + "Couchbase Connection String", default="couchbase://localhost" + ) + user = click.prompt("Couchbase Username", default="Administrator") + password = click.prompt("Couchbase Password", hide_input=True) + bucket_name = click.prompt("Couchbase Bucket Name", default="feast") + kv_port = click.prompt("Couchbase KV Port", default=11210) + + replace_str_in_file( + config_file, "COUCHBASE_CONNECTION_STRING", connection_string + ) + replace_str_in_file(config_file, "COUCHBASE_USER", user) + replace_str_in_file(config_file, "COUCHBASE_PASSWORD", password) + replace_str_in_file(config_file, "COUCHBASE_BUCKET_NAME", bucket_name) + replace_str_in_file(config_file, "COUCHBASE_KV_PORT", str(kv_port)) + + if click.confirm( + "Configure Couchbase Columnar Offline Store? (Note: requires Couchbase Capella Columnar)", + default=True, + ): + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + + columnar_connection_string = click.prompt("Columnar Connection String") + columnar_user = click.prompt("Columnar Username") + columnar_password = click.prompt("Columnar Password", hide_input=True) + columnar_timeout = click.prompt("Couchbase Columnar Timeout", default=480) + + if click.confirm( + 'Should I upload example data to Couchbase Capella Columnar (overwriting "Default.Default.feast_driver_hourly_stats" table)?', + default=True, + ): + cred = Credential.from_username_and_password( + columnar_user, columnar_password + ) + cluster = Cluster.create_instance(columnar_connection_string, cred) + + table_name = "Default.Default.feast_driver_hourly_stats" + try: + cluster.execute_query( + f"DROP COLLECTION {table_name} IF EXISTS", + QueryOptions(timeout=timedelta(seconds=500)), + ) + except TimeoutError: + # FIXME: temp workaround, timeouts occur in Columnar SDK even when the drop was successful + pass + except InvalidCredentialError: + print("Error: Invalid Cluster Credentials.") + return + + offline_store = CouchbaseColumnarOfflineStoreConfig( + type="couchbase", + connection_string=columnar_connection_string, + user=columnar_user, + password=columnar_password, + timeout=columnar_timeout, + ) + + df_to_columnar( + df=driver_df, table_name=table_name, offline_store=offline_store + ) + + replace_str_in_file( + config_file, + "COUCHBASE_COLUMNAR_CONNECTION_STRING", + columnar_connection_string, + ) + replace_str_in_file(config_file, "COUCHBASE_COLUMNAR_USER", columnar_user) + replace_str_in_file( + config_file, "COUCHBASE_COLUMNAR_PASSWORD", columnar_password + ) + replace_str_in_file( + config_file, "COUCHBASE_COLUMNAR_TIMEOUT", str(columnar_timeout) + ) + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/couchbase/feature_repo/example_repo.py b/sdk/python/feast/templates/couchbase/feature_repo/example_repo.py new file mode 100644 index 0000000000..a50a66a8cb --- /dev/null +++ b/sdk/python/feast/templates/couchbase/feature_repo/example_repo.py @@ -0,0 +1,134 @@ +# This is an example feature definition file + +from datetime import timedelta + +import pandas as pd + +from feast import Entity, FeatureService, FeatureView, Field, PushSource, RequestSource +from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase_source import ( + CouchbaseColumnarSource, +) +from feast.on_demand_feature_view import on_demand_feature_view +from feast.types import Float32, Float64, Int64 + +# Define an entity for the driver. You can think of an entity as a primary key used to +# fetch features. +driver = Entity(name="driver", join_keys=["driver_id"]) + +driver_stats_source = CouchbaseColumnarSource( + name="driver_hourly_stats_source", + query="SELECT * FROM foo_database.bar_scope.`feast_driver_hourly_stats`", + database="foo_database", + scope="bar_scope", + collection="feast_driver_hourly_stats", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +# Our parquet files contain sample data that includes a driver_id column, timestamps and +# three feature column. Here we define a Feature View that will allow us to serve this +# data to our model online. +driver_stats_fv = FeatureView( + # The unique name of this feature view. Two feature views in a single + # project cannot have the same name + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + # The list of features defined below act as a schema to both define features + # for both materialization of features into a store, and are used as references + # during retrieval for building a training dataset or serving features + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, + # Tags are user defined key/value pairs that are attached to each + # feature view + tags={"team": "driver_performance"}, +) + +# Define a request data source which encodes features / information only +# available at request time (e.g. part of the user initiated HTTP request) +input_request = RequestSource( + name="vals_to_add", + schema=[ + Field(name="val_to_add", dtype=Int64), + Field(name="val_to_add_2", dtype=Int64), + ], +) + + +# Define an on demand feature view which can generate new features based on +# existing feature views and RequestSource features +@on_demand_feature_view( + sources=[driver_stats_fv, input_request], + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), + ], +) +def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + return df + + +# This groups features into a model version +driver_activity_v1 = FeatureService( + name="driver_activity_v1", + features=[ + driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view + transformed_conv_rate, # Selects all features from the feature view + ], +) +driver_activity_v2 = FeatureService( + name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate] +) + +# Defines a way to push data (to be available offline, online or both) into Feast. +driver_stats_push_source = PushSource( + name="driver_stats_push_source", + batch_source=driver_stats_source, +) + +# Defines a slightly modified version of the feature view from above, where the source +# has been changed to the push source. This allows fresh features to be directly pushed +# to the online store for this feature view. +driver_stats_fresh_fv = FeatureView( + name="driver_hourly_stats_fresh", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_push_source, # Changed from above + tags={"team": "driver_performance"}, +) + + +# Define an on demand feature view which can generate new features based on +# existing feature views and RequestSource features +@on_demand_feature_view( + sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), + ], +) +def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + return df + + +driver_activity_v3 = FeatureService( + name="driver_activity_v3", + features=[driver_stats_fresh_fv, transformed_conv_rate_fresh], +) diff --git a/sdk/python/feast/templates/couchbase/feature_repo/feature_store.yaml b/sdk/python/feast/templates/couchbase/feature_repo/feature_store.yaml index bc21e44def..913f17565a 100644 --- a/sdk/python/feast/templates/couchbase/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/couchbase/feature_repo/feature_store.yaml @@ -1,5 +1,5 @@ project: my_project -registry: /path/to/registry.db +registry: data/registry.db provider: local online_store: type: couchbase @@ -8,4 +8,10 @@ online_store: password: COUCHBASE_PASSWORD # Couchbase password from database access credentials bucket_name: COUCHBASE_BUCKET_NAME # Couchbase bucket name, defaults to feast kv_port: COUCHBASE_KV_PORT # Couchbase key-value port, defaults to 11210. Required if custom ports are used. +offline_store: + type: couchbase + connection_string: COUCHBASE_COLUMNAR_CONNECTION_STRING # Copied from 'Connect' page in Capella Columnar console, starts with couchbases:// user: COUCHBASE_USER # Couchbase username from database access credentials + user: COUCHBASE_COLUMNAR_USER # Couchbase cluster access name from access control + password: COUCHBASE_COLUMNAR_PASSWORD # Couchbase password from access control + timeout: COUCHBASE_COLUMNAR_TIMEOUT # Timeout in seconds for Columnar operations, optional entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/couchbase/feature_repo/test_workflow.py b/sdk/python/feast/templates/couchbase/feature_repo/test_workflow.py new file mode 100644 index 0000000000..192d575181 --- /dev/null +++ b/sdk/python/feast/templates/couchbase/feature_repo/test_workflow.py @@ -0,0 +1,112 @@ +import os.path +import subprocess +from datetime import datetime + +import pandas as pd + +from feast import FeatureStore + + +def run_demo(): + store = FeatureStore(repo_path=os.path.dirname(__file__)) + print("\n--- Run feast apply to setup feature store on Couchbase ---") + subprocess.run(["feast", "--chdir", os.path.dirname(__file__), "apply"]) + + print("\n--- Historical features for training ---") + fetch_historical_features_entity_df(store, for_batch_scoring=False) + + print("\n--- Historical features for batch scoring ---") + fetch_historical_features_entity_df(store, for_batch_scoring=True) + + print("\n--- Load features into online store ---") + store.materialize_incremental(end_date=datetime.now()) + + print("\n--- Online features ---") + fetch_online_features(store) + + print("\n--- Online features retrieved (instead) through a feature service---") + fetch_online_features(store, source="feature_service") + + print( + "\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---" + ) + fetch_online_features(store, source="push") + + print("\n--- Online features again with updated values from a stream push---") + fetch_online_features(store, source="push") + + print("\n--- Run feast teardown ---") + subprocess.run(["feast", "--chdir", os.path.dirname(__file__), "teardown"]) + + +def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool): + # Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve + # for all entities in the offline store instead + entity_df = pd.DataFrame.from_dict( + { + # entity's join key -> entity values + "driver_id": [1001, 1002, 1003], + # "event_timestamp" (reserved key) -> timestamps + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + # (optional) label name -> label values. Feast does not process these + "label_driver_reported_satisfaction": [1, 5, 3], + # values we're using for an on-demand transformation + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } + ) + # For batch scoring, we want the latest timestamps + if for_batch_scoring: + entity_df["event_timestamp"] = pd.to_datetime("now", utc=True) + + training_df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ], + ).to_df() + print(training_df.head()) + + +def fetch_online_features(store, source: str = ""): + entity_rows = [ + # {join_key: entity_value} + { + "driver_id": 1001, + "val_to_add": 1000, + "val_to_add_2": 2000, + }, + { + "driver_id": 1002, + "val_to_add": 1001, + "val_to_add_2": 2002, + }, + ] + if source == "feature_service": + features_to_fetch = store.get_feature_service("driver_activity_v1") + elif source == "push": + features_to_fetch = store.get_feature_service("driver_activity_v3") + else: + features_to_fetch = [ + "driver_hourly_stats:acc_rate", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ] + returned_features = store.get_online_features( + features=features_to_fetch, + entity_rows=entity_rows, + ).to_dict() + for key, value in sorted(returned_features.items()): + print(key, " : ", value) + + +if __name__ == "__main__": + run_demo() From d14f415a6a8295880ef8d7c8446d3c2e06095a77 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Wed, 5 Feb 2025 11:48:19 -0800 Subject: [PATCH 06/15] Temp Timeout Fix and Lint Signed-off-by: Elliot Scribner --- .../contrib/couchbase_offline_store/couchbase.py | 6 +++++- .../contrib/couchbase_offline_store/tests/data_source.py | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index 52e2c30f48..638ae6ffd9 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -365,7 +365,11 @@ def _execute_query( ) -def df_to_columnar(df: pd.DataFrame, table_name: str, offline_store: CouchbaseColumnarOfflineStoreConfig): +def df_to_columnar( + df: pd.DataFrame, + table_name: str, + offline_store: CouchbaseColumnarOfflineStoreConfig, +): df_copy = df.copy() insert_values = df_copy.apply( lambda row: { diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py index 02c806c866..413dc2e618 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py @@ -141,5 +141,8 @@ def teardown(self): ), ) print(f"Successfully dropped collection: {collection}") + except TimeoutError: + # FIXME: temp workaround, timeouts occur in Columnar SDK even when the drop was successful + pass except Exception as e: print(f"Error dropping collection {collection}: {e}") From b229f28f2f12432b9428da03db74622ae2fadd9e Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Wed, 5 Feb 2025 11:56:05 -0800 Subject: [PATCH 07/15] Initial Docs Signed-off-by: Elliot Scribner --- docs/SUMMARY.md | 1 + docs/reference/data-sources/README.md | 4 ++ docs/reference/data-sources/couchbase.md | 37 ++++++++++++ docs/reference/data-sources/overview.md | 22 +++---- docs/reference/offline-stores/README.md | 4 ++ docs/reference/offline-stores/couchbase.md | 69 ++++++++++++++++++++++ docs/reference/offline-stores/overview.md | 42 ++++++------- sdk/python/feast/type_map.py | 1 + 8 files changed, 148 insertions(+), 32 deletions(-) create mode 100644 docs/reference/data-sources/couchbase.md create mode 100644 docs/reference/offline-stores/couchbase.md diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index bbda7773b4..d67bb8dc86 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -93,6 +93,7 @@ * [BigQuery](reference/offline-stores/bigquery.md) * [Redshift](reference/offline-stores/redshift.md) * [DuckDB](reference/offline-stores/duckdb.md) + * [Couchbase Columnar (contrib)](reference/offline-stores/couchbase.md) * [Spark (contrib)](reference/offline-stores/spark.md) * [PostgreSQL (contrib)](reference/offline-stores/postgres.md) * [Trino (contrib)](reference/offline-stores/trino.md) diff --git a/docs/reference/data-sources/README.md b/docs/reference/data-sources/README.md index e69fbab8e3..09df6b861e 100644 --- a/docs/reference/data-sources/README.md +++ b/docs/reference/data-sources/README.md @@ -34,6 +34,10 @@ Please see [Data Source](../../getting-started/concepts/data-ingestion.md) for a [kinesis.md](kinesis.md) {% endcontent-ref %} +{% content-ref url="couchbase.md" %} +[couchbase.md](couchbase.md) +{% endcontent-ref %} + {% content-ref url="spark.md" %} [spark.md](spark.md) {% endcontent-ref %} diff --git a/docs/reference/data-sources/couchbase.md b/docs/reference/data-sources/couchbase.md new file mode 100644 index 0000000000..596e33cf50 --- /dev/null +++ b/docs/reference/data-sources/couchbase.md @@ -0,0 +1,37 @@ +# Couchbase Columnar source (contrib) + +## Description + +Couchbase Columnar data sources are [Couchbase Capella Columnar](https://docs.couchbase.com/columnar/intro/intro.html) collections that can be used as a source for feature data. **Note that Couchbase Columnar is available through [Couchbase Capella](https://cloud.couchbase.com/).** + +## Disclaimer + +The Couchbase Columnar data source does not achieve full test coverage. +Please do not assume complete stability. + +## Examples + +Defining a Couchbase Columnar source: + +```python +from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase_source import ( + CouchbaseColumnarSource, +) + +driver_stats_source = CouchbaseColumnarSource( + name="driver_hourly_stats_source", + query="SELECT * FROM Default.Default.`feast_driver_hourly_stats`", + database="Default", + scope="Default", + collection="feast_driver_hourly_stats", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) +``` + +The full set of configuration options is available [here](https://rtd.feast.dev/en/master/#feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase_source.CouchbaseColumnarSource). + +## Supported Types + +Couchbase Capella Columnar data sources support `BOOLEAN`, `STRING`, `BIGINT`, and `DOUBLE` primitive types. +For a comparison against other batch data sources, please see [here](overview.md#functionality-matrix). diff --git a/docs/reference/data-sources/overview.md b/docs/reference/data-sources/overview.md index 5c2fdce9fd..9880d388dd 100644 --- a/docs/reference/data-sources/overview.md +++ b/docs/reference/data-sources/overview.md @@ -18,14 +18,14 @@ Details for each specific data source can be found [here](README.md). Below is a matrix indicating which data sources support which types. -| | File | BigQuery | Snowflake | Redshift | Postgres | Spark | Trino | -| :-------------------------------- | :-- | :-- |:----------| :-- | :-- | :-- | :-- | -| `bytes` | yes | yes | yes | yes | yes | yes | yes | -| `string` | yes | yes | yes | yes | yes | yes | yes | -| `int32` | yes | yes | yes | yes | yes | yes | yes | -| `int64` | yes | yes | yes | yes | yes | yes | yes | -| `float32` | yes | yes | yes | yes | yes | yes | yes | -| `float64` | yes | yes | yes | yes | yes | yes | yes | -| `bool` | yes | yes | yes | yes | yes | yes | yes | -| `timestamp` | yes | yes | yes | yes | yes | yes | yes | -| array types | yes | yes | yes | no | yes | yes | no | \ No newline at end of file +| | File | BigQuery | Snowflake | Redshift | Postgres | Spark | Trino | Couchbase | +| :-------------------------------- | :-- | :-- |:----------| :-- | :-- | :-- | :-- |:----------| +| `bytes` | yes | yes | yes | yes | yes | yes | yes | yes | +| `string` | yes | yes | yes | yes | yes | yes | yes | yes | +| `int32` | yes | yes | yes | yes | yes | yes | yes | yes | +| `int64` | yes | yes | yes | yes | yes | yes | yes | yes | +| `float32` | yes | yes | yes | yes | yes | yes | yes | yes | +| `float64` | yes | yes | yes | yes | yes | yes | yes | yes | +| `bool` | yes | yes | yes | yes | yes | yes | yes | yes | +| `timestamp` | yes | yes | yes | yes | yes | yes | yes | yes | +| array types | yes | yes | yes | no | yes | yes | no | no | diff --git a/docs/reference/offline-stores/README.md b/docs/reference/offline-stores/README.md index 2b62c4e1f1..ab25fe9a27 100644 --- a/docs/reference/offline-stores/README.md +++ b/docs/reference/offline-stores/README.md @@ -26,6 +26,10 @@ Please see [Offline Store](../../getting-started/components/offline-store.md) fo [duckdb.md](duckdb.md) {% endcontent-ref %} +{% content-ref url="couchbase.md" %} +[couchbase.md](couchbase.md) +{% endcontent-ref %} + {% content-ref url="spark.md" %} [spark.md](spark.md) {% endcontent-ref %} diff --git a/docs/reference/offline-stores/couchbase.md b/docs/reference/offline-stores/couchbase.md new file mode 100644 index 0000000000..75d437a3f5 --- /dev/null +++ b/docs/reference/offline-stores/couchbase.md @@ -0,0 +1,69 @@ +# Couchbase Columnar offline store (contrib) + +## Description + +The Couchbase Columnar offline store provides support for reading [CouchbaseColumnarSources](../data-sources/couchbase.md). **Note that Couchbase Columnar is available through [Couchbase Capella](https://cloud.couchbase.com/).** +* Entity dataframes can be provided as a SQL++ query or can be provided as a Pandas dataframe. A Pandas dataframe will be uploaded to Couchbase Capella Columnar as a collection. + +## Disclaimer + +The Couchbase Columnar offline store does not achieve full test coverage. +Please do not assume complete stability. + +## Getting started + +In order to use this offline store, you'll need to run `pip install 'feast[couchbase]'`. You can get started by then running `feast init -t couchbase`. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_project +registry: data/registry.db +provider: local +offline_store: + type: couchbase + connection_string: COUCHBASE_COLUMNAR_CONNECTION_STRING # Copied from 'Connect' page in Capella Columnar console, starts with couchbases:// + user: COUCHBASE_COLUMNAR_USER # Couchbase username from access credentials + password: COUCHBASE_COLUMNAR_PASSWORD # Couchbase password from access credentials + timeout: 120 # Timeout in seconds for Columnar operations, optional +online_store: + path: data/online_store.db +``` +{% endcode %} + +Note that `timeout`is an optional parameter. +The full set of configuration options is available in [CouchbaseColumnarOfflineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase.CouchbaseColumnarOfflineStoreConfig). + + +## Functionality Matrix + +The set of functionality supported by offline stores is described in detail [here](overview.md#functionality). +Below is a matrix indicating which functionality is supported by the Couchbase Columnar offline store. + +| | Couchbase Columnar | +| :----------------------------------------------------------------- |:-------------------| +| `get_historical_features` (point-in-time correct join) | yes | +| `pull_latest_from_table_or_query` (retrieve latest feature values) | yes | +| `pull_all_from_table_or_query` (retrieve a saved dataset) | yes | +| `offline_write_batch` (persist dataframes to offline store) | no | +| `write_logged_features` (persist logged features to offline store) | no | + +Below is a matrix indicating which functionality is supported by `CouchbaseColumnarRetrievalJob`. + +| | Couchbase Columnar | +| ----------------------------------------------------- |--------------------| +| export to dataframe | yes | +| export to arrow table | yes | +| export to arrow batches | no | +| export to SQL | yes | +| export to data lake (S3, GCS, etc.) | yes | +| export to data warehouse | yes | +| export as Spark dataframe | no | +| local execution of Python-based on-demand transforms | yes | +| remote execution of Python-based on-demand transforms | no | +| persist results in the offline store | yes | +| preview the query plan before execution | yes | +| read partitioned data | yes | + +To compare this set of functionality against other offline stores, please see the full [functionality matrix](overview.md#functionality-matrix). diff --git a/docs/reference/offline-stores/overview.md b/docs/reference/offline-stores/overview.md index 182eac6586..191ccd21a6 100644 --- a/docs/reference/offline-stores/overview.md +++ b/docs/reference/offline-stores/overview.md @@ -31,28 +31,28 @@ Details for each specific offline store, such as how to configure it in a `featu Below is a matrix indicating which offline stores support which methods. -| | Dask | BigQuery | Snowflake | Redshift | Postgres | Spark | Trino | -| :-------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | -| `get_historical_features` | yes | yes | yes | yes | yes | yes | yes | -| `pull_latest_from_table_or_query` | yes | yes | yes | yes | yes | yes | yes | -| `pull_all_from_table_or_query` | yes | yes | yes | yes | yes | yes | yes | -| `offline_write_batch` | yes | yes | yes | yes | no | no | no | -| `write_logged_features` | yes | yes | yes | yes | no | no | no | +| | Dask | BigQuery | Snowflake | Redshift | Postgres | Spark | Trino | Couchbase | +| :-------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | +| `get_historical_features` | yes | yes | yes | yes | yes | yes | yes | yes | +| `pull_latest_from_table_or_query` | yes | yes | yes | yes | yes | yes | yes | yes | +| `pull_all_from_table_or_query` | yes | yes | yes | yes | yes | yes | yes | yes | +| `offline_write_batch` | yes | yes | yes | yes | no | no | no | no | +| `write_logged_features` | yes | yes | yes | yes | no | no | no | no | Below is a matrix indicating which `RetrievalJob`s support what functionality. -| | Dask | BigQuery | Snowflake | Redshift | Postgres | Spark | Trino | DuckDB | -| --------------------------------- | --- | --- | --- | --- | --- | --- | --- | --- | -| export to dataframe | yes | yes | yes | yes | yes | yes | yes | yes | -| export to arrow table | yes | yes | yes | yes | yes | yes | yes | yes | -| export to arrow batches | no | no | no | yes | no | no | no | no | -| export to SQL | no | yes | yes | yes | yes | no | yes | no | -| export to data lake (S3, GCS, etc.) | no | no | yes | no | yes | no | no | no | -| export to data warehouse | no | yes | yes | yes | yes | no | no | no | -| export as Spark dataframe | no | no | yes | no | no | yes | no | no | -| local execution of Python-based on-demand transforms | yes | yes | yes | yes | yes | no | yes | yes | -| remote execution of Python-based on-demand transforms | no | no | no | no | no | no | no | no | -| persist results in the offline store | yes | yes | yes | yes | yes | yes | no | yes | -| preview the query plan before execution | yes | yes | yes | yes | yes | yes | yes | no | -| read partitioned data | yes | yes | yes | yes | yes | yes | yes | yes | +| | Dask | BigQuery | Snowflake | Redshift | Postgres | Spark | Trino | DuckDB | Couchbase | +| --------------------------------- | --- | --- | --- | --- | --- | --- | --- | --- | --- | +| export to dataframe | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| export to arrow table | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| export to arrow batches | no | no | no | yes | no | no | no | no | no | +| export to SQL | no | yes | yes | yes | yes | no | yes | no | yes | +| export to data lake (S3, GCS, etc.) | no | no | yes | no | yes | no | no | no | yes | +| export to data warehouse | no | yes | yes | yes | yes | no | no | no | yes | +| export as Spark dataframe | no | no | yes | no | no | yes | no | no | no | +| local execution of Python-based on-demand transforms | yes | yes | yes | yes | yes | no | yes | yes | yes | +| remote execution of Python-based on-demand transforms | no | no | no | no | no | no | no | no | no | +| persist results in the offline store | yes | yes | yes | yes | yes | yes | no | yes | yes | +| preview the query plan before execution | yes | yes | yes | yes | yes | yes | yes | no | yes | +| read partitioned data | yes | yes | yes | yes | yes | yes | yes | yes | yes | diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 526a5346e0..269d431661 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -1093,6 +1093,7 @@ def cb_columnar_type_to_feast_value_type(type_str: str) -> ValueType: "null": ValueType.NULL, "missing": ValueType.UNKNOWN, # composite types + # todo: support for arrays of primitives "object": ValueType.UNKNOWN, "array": ValueType.UNKNOWN, "multiset": ValueType.UNKNOWN, From 1300915ab8f0d11452c13b95973c4a66f6b0b06d Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Thu, 6 Feb 2025 13:43:06 -0800 Subject: [PATCH 08/15] Fixing Template Signed-off-by: Elliot Scribner --- sdk/python/feast/templates/couchbase/bootstrap.py | 2 +- .../feast/templates/couchbase/feature_repo/example_repo.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/templates/couchbase/bootstrap.py b/sdk/python/feast/templates/couchbase/bootstrap.py index a638db426e..f69a7bd593 100644 --- a/sdk/python/feast/templates/couchbase/bootstrap.py +++ b/sdk/python/feast/templates/couchbase/bootstrap.py @@ -24,7 +24,7 @@ def bootstrap(): if click.confirm("Configure Couchbase Online Store?", default=True): connection_string = click.prompt( - "Couchbase Connection String", default="couchbase://localhost" + "Couchbase Connection String", default="couchbase://127.0.0.1" ) user = click.prompt("Couchbase Username", default="Administrator") password = click.prompt("Couchbase Password", hide_input=True) diff --git a/sdk/python/feast/templates/couchbase/feature_repo/example_repo.py b/sdk/python/feast/templates/couchbase/feature_repo/example_repo.py index a50a66a8cb..363ba3c466 100644 --- a/sdk/python/feast/templates/couchbase/feature_repo/example_repo.py +++ b/sdk/python/feast/templates/couchbase/feature_repo/example_repo.py @@ -17,9 +17,9 @@ driver_stats_source = CouchbaseColumnarSource( name="driver_hourly_stats_source", - query="SELECT * FROM foo_database.bar_scope.`feast_driver_hourly_stats`", - database="foo_database", - scope="bar_scope", + query="SELECT * FROM Default.Default.`feast_driver_hourly_stats`", + database="Default", + scope="Default", collection="feast_driver_hourly_stats", timestamp_field="event_timestamp", created_timestamp_column="created", From 652f46abf692965fab3fa22571561ae47e12b3ec Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Thu, 6 Feb 2025 14:54:48 -0800 Subject: [PATCH 09/15] Protos Signed-off-by: Elliot Scribner --- protos/feast/core/FeatureService.proto | 10 ++++ .../protos/feast/core/FeatureService_pb2.py | 38 +++++++------- .../protos/feast/core/FeatureService_pb2.pyi | 50 ++++++++++--------- 3 files changed, 57 insertions(+), 41 deletions(-) diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index b143ba73f4..380b2dc371 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -61,6 +61,7 @@ message LoggingConfig { SnowflakeDestination snowflake_destination = 6; CustomDestination custom_destination = 7; AthenaDestination athena_destination = 8; + CouchbaseColumnarDestination couchbase_columnar_destination = 9; } message FileDestination { @@ -95,6 +96,15 @@ message LoggingConfig { string kind = 1; map config = 2; } + + message CouchbaseColumnarDestination { + // Destination database name + string database = 1; + // Destination scope name + string scope = 2; + // Destination collection name + string collection = 3; + } } message FeatureServiceList { diff --git a/sdk/python/feast/protos/feast/core/FeatureService_pb2.py b/sdk/python/feast/protos/feast/core/FeatureService_pb2.py index 642d5b010f..7ef3607969 100644 --- a/sdk/python/feast/protos/feast/core/FeatureService_pb2.py +++ b/sdk/python/feast/protos/feast/core/FeatureService_pb2.py @@ -16,7 +16,7 @@ from feast.protos.feast.core import FeatureViewProjection_pb2 as feast_dot_core_dot_FeatureViewProjection__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1f\x66\x65\x61st/core/FeatureService.proto\x12\nfeast.core\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&feast/core/FeatureViewProjection.proto\"l\n\x0e\x46\x65\x61tureService\x12,\n\x04spec\x18\x01 \x01(\x0b\x32\x1e.feast.core.FeatureServiceSpec\x12,\n\x04meta\x18\x02 \x01(\x0b\x32\x1e.feast.core.FeatureServiceMeta\"\xa4\x02\n\x12\x46\x65\x61tureServiceSpec\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0f\n\x07project\x18\x02 \x01(\t\x12\x33\n\x08\x66\x65\x61tures\x18\x03 \x03(\x0b\x32!.feast.core.FeatureViewProjection\x12\x36\n\x04tags\x18\x04 \x03(\x0b\x32(.feast.core.FeatureServiceSpec.TagsEntry\x12\x13\n\x0b\x64\x65scription\x18\x05 \x01(\t\x12\r\n\x05owner\x18\x06 \x01(\t\x12\x31\n\x0elogging_config\x18\x07 \x01(\x0b\x32\x19.feast.core.LoggingConfig\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x87\x01\n\x12\x46\x65\x61tureServiceMeta\x12\x35\n\x11\x63reated_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x9a\x07\n\rLoggingConfig\x12\x13\n\x0bsample_rate\x18\x01 \x01(\x02\x12\x45\n\x10\x66ile_destination\x18\x03 \x01(\x0b\x32).feast.core.LoggingConfig.FileDestinationH\x00\x12M\n\x14\x62igquery_destination\x18\x04 \x01(\x0b\x32-.feast.core.LoggingConfig.BigQueryDestinationH\x00\x12M\n\x14redshift_destination\x18\x05 \x01(\x0b\x32-.feast.core.LoggingConfig.RedshiftDestinationH\x00\x12O\n\x15snowflake_destination\x18\x06 \x01(\x0b\x32..feast.core.LoggingConfig.SnowflakeDestinationH\x00\x12I\n\x12\x63ustom_destination\x18\x07 \x01(\x0b\x32+.feast.core.LoggingConfig.CustomDestinationH\x00\x12I\n\x12\x61thena_destination\x18\x08 \x01(\x0b\x32+.feast.core.LoggingConfig.AthenaDestinationH\x00\x1aS\n\x0f\x46ileDestination\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x1c\n\x14s3_endpoint_override\x18\x02 \x01(\t\x12\x14\n\x0cpartition_by\x18\x03 \x03(\t\x1a(\n\x13\x42igQueryDestination\x12\x11\n\ttable_ref\x18\x01 \x01(\t\x1a)\n\x13RedshiftDestination\x12\x12\n\ntable_name\x18\x01 \x01(\t\x1a\'\n\x11\x41thenaDestination\x12\x12\n\ntable_name\x18\x01 \x01(\t\x1a*\n\x14SnowflakeDestination\x12\x12\n\ntable_name\x18\x01 \x01(\t\x1a\x99\x01\n\x11\x43ustomDestination\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12G\n\x06\x63onfig\x18\x02 \x03(\x0b\x32\x37.feast.core.LoggingConfig.CustomDestination.ConfigEntry\x1a-\n\x0b\x43onfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\r\n\x0b\x64\x65stination\"I\n\x12\x46\x65\x61tureServiceList\x12\x33\n\x0f\x66\x65\x61tureservices\x18\x01 \x03(\x0b\x32\x1a.feast.core.FeatureServiceBX\n\x10\x66\x65\x61st.proto.coreB\x13\x46\x65\x61tureServiceProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1f\x66\x65\x61st/core/FeatureService.proto\x12\nfeast.core\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&feast/core/FeatureViewProjection.proto\"l\n\x0e\x46\x65\x61tureService\x12,\n\x04spec\x18\x01 \x01(\x0b\x32\x1e.feast.core.FeatureServiceSpec\x12,\n\x04meta\x18\x02 \x01(\x0b\x32\x1e.feast.core.FeatureServiceMeta\"\xa4\x02\n\x12\x46\x65\x61tureServiceSpec\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0f\n\x07project\x18\x02 \x01(\t\x12\x33\n\x08\x66\x65\x61tures\x18\x03 \x03(\x0b\x32!.feast.core.FeatureViewProjection\x12\x36\n\x04tags\x18\x04 \x03(\x0b\x32(.feast.core.FeatureServiceSpec.TagsEntry\x12\x13\n\x0b\x64\x65scription\x18\x05 \x01(\t\x12\r\n\x05owner\x18\x06 \x01(\t\x12\x31\n\x0elogging_config\x18\x07 \x01(\x0b\x32\x19.feast.core.LoggingConfig\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x87\x01\n\x12\x46\x65\x61tureServiceMeta\x12\x35\n\x11\x63reated_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xd1\x08\n\rLoggingConfig\x12\x13\n\x0bsample_rate\x18\x01 \x01(\x02\x12\x45\n\x10\x66ile_destination\x18\x03 \x01(\x0b\x32).feast.core.LoggingConfig.FileDestinationH\x00\x12M\n\x14\x62igquery_destination\x18\x04 \x01(\x0b\x32-.feast.core.LoggingConfig.BigQueryDestinationH\x00\x12M\n\x14redshift_destination\x18\x05 \x01(\x0b\x32-.feast.core.LoggingConfig.RedshiftDestinationH\x00\x12O\n\x15snowflake_destination\x18\x06 \x01(\x0b\x32..feast.core.LoggingConfig.SnowflakeDestinationH\x00\x12I\n\x12\x63ustom_destination\x18\x07 \x01(\x0b\x32+.feast.core.LoggingConfig.CustomDestinationH\x00\x12I\n\x12\x61thena_destination\x18\x08 \x01(\x0b\x32+.feast.core.LoggingConfig.AthenaDestinationH\x00\x12`\n\x1e\x63ouchbase_columnar_destination\x18\t \x01(\x0b\x32\x36.feast.core.LoggingConfig.CouchbaseColumnarDestinationH\x00\x1aS\n\x0f\x46ileDestination\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x1c\n\x14s3_endpoint_override\x18\x02 \x01(\t\x12\x14\n\x0cpartition_by\x18\x03 \x03(\t\x1a(\n\x13\x42igQueryDestination\x12\x11\n\ttable_ref\x18\x01 \x01(\t\x1a)\n\x13RedshiftDestination\x12\x12\n\ntable_name\x18\x01 \x01(\t\x1a\'\n\x11\x41thenaDestination\x12\x12\n\ntable_name\x18\x01 \x01(\t\x1a*\n\x14SnowflakeDestination\x12\x12\n\ntable_name\x18\x01 \x01(\t\x1a\x99\x01\n\x11\x43ustomDestination\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12G\n\x06\x63onfig\x18\x02 \x03(\x0b\x32\x37.feast.core.LoggingConfig.CustomDestination.ConfigEntry\x1a-\n\x0b\x43onfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1aS\n\x1c\x43ouchbaseColumnarDestination\x12\x10\n\x08\x64\x61tabase\x18\x01 \x01(\t\x12\r\n\x05scope\x18\x02 \x01(\t\x12\x12\n\ncollection\x18\x03 \x01(\tB\r\n\x0b\x64\x65stination\"I\n\x12\x46\x65\x61tureServiceList\x12\x33\n\x0f\x66\x65\x61tureservices\x18\x01 \x03(\x0b\x32\x1a.feast.core.FeatureServiceBX\n\x10\x66\x65\x61st.proto.coreB\x13\x46\x65\x61tureServiceProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -37,21 +37,23 @@ _globals['_FEATURESERVICEMETA']._serialized_start=526 _globals['_FEATURESERVICEMETA']._serialized_end=661 _globals['_LOGGINGCONFIG']._serialized_start=664 - _globals['_LOGGINGCONFIG']._serialized_end=1586 - _globals['_LOGGINGCONFIG_FILEDESTINATION']._serialized_start=1162 - _globals['_LOGGINGCONFIG_FILEDESTINATION']._serialized_end=1245 - _globals['_LOGGINGCONFIG_BIGQUERYDESTINATION']._serialized_start=1247 - _globals['_LOGGINGCONFIG_BIGQUERYDESTINATION']._serialized_end=1287 - _globals['_LOGGINGCONFIG_REDSHIFTDESTINATION']._serialized_start=1289 - _globals['_LOGGINGCONFIG_REDSHIFTDESTINATION']._serialized_end=1330 - _globals['_LOGGINGCONFIG_ATHENADESTINATION']._serialized_start=1332 - _globals['_LOGGINGCONFIG_ATHENADESTINATION']._serialized_end=1371 - _globals['_LOGGINGCONFIG_SNOWFLAKEDESTINATION']._serialized_start=1373 - _globals['_LOGGINGCONFIG_SNOWFLAKEDESTINATION']._serialized_end=1415 - _globals['_LOGGINGCONFIG_CUSTOMDESTINATION']._serialized_start=1418 - _globals['_LOGGINGCONFIG_CUSTOMDESTINATION']._serialized_end=1571 - _globals['_LOGGINGCONFIG_CUSTOMDESTINATION_CONFIGENTRY']._serialized_start=1526 - _globals['_LOGGINGCONFIG_CUSTOMDESTINATION_CONFIGENTRY']._serialized_end=1571 - _globals['_FEATURESERVICELIST']._serialized_start=1588 - _globals['_FEATURESERVICELIST']._serialized_end=1661 + _globals['_LOGGINGCONFIG']._serialized_end=1769 + _globals['_LOGGINGCONFIG_FILEDESTINATION']._serialized_start=1260 + _globals['_LOGGINGCONFIG_FILEDESTINATION']._serialized_end=1343 + _globals['_LOGGINGCONFIG_BIGQUERYDESTINATION']._serialized_start=1345 + _globals['_LOGGINGCONFIG_BIGQUERYDESTINATION']._serialized_end=1385 + _globals['_LOGGINGCONFIG_REDSHIFTDESTINATION']._serialized_start=1387 + _globals['_LOGGINGCONFIG_REDSHIFTDESTINATION']._serialized_end=1428 + _globals['_LOGGINGCONFIG_ATHENADESTINATION']._serialized_start=1430 + _globals['_LOGGINGCONFIG_ATHENADESTINATION']._serialized_end=1469 + _globals['_LOGGINGCONFIG_SNOWFLAKEDESTINATION']._serialized_start=1471 + _globals['_LOGGINGCONFIG_SNOWFLAKEDESTINATION']._serialized_end=1513 + _globals['_LOGGINGCONFIG_CUSTOMDESTINATION']._serialized_start=1516 + _globals['_LOGGINGCONFIG_CUSTOMDESTINATION']._serialized_end=1669 + _globals['_LOGGINGCONFIG_CUSTOMDESTINATION_CONFIGENTRY']._serialized_start=1624 + _globals['_LOGGINGCONFIG_CUSTOMDESTINATION_CONFIGENTRY']._serialized_end=1669 + _globals['_LOGGINGCONFIG_COUCHBASECOLUMNARDESTINATION']._serialized_start=1671 + _globals['_LOGGINGCONFIG_COUCHBASECOLUMNARDESTINATION']._serialized_end=1754 + _globals['_FEATURESERVICELIST']._serialized_start=1771 + _globals['_FEATURESERVICELIST']._serialized_end=1844 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi b/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi index 0652ce0216..6d5879e52c 100644 --- a/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi @@ -196,26 +196,6 @@ class LoggingConfig(google.protobuf.message.Message): table_name: builtins.str = ..., ) -> None: ... def ClearField(self, field_name: typing_extensions.Literal["table_name", b"table_name"]) -> None: ... - class CouchbaseColumnarDestination(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - DATABASE_FIELD_NUMBER: builtins.int - SCOPE_FIELD_NUMBER: builtins.int - COLLECTION_FIELD_NUMBER: builtins.int - database: builtins.str - """Destination database name""" - scope: builtins.str - """Destination scope name""" - collection: builtins.str - """Destination collection name""" - def __init__( - self, - *, - database: builtins.str = ..., - scope: builtins.str = ..., - collection: builtins.str = ..., - ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["database", b"database", "scope", b"scope", "collection", b"collection"]) -> None: ... class CustomDestination(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -248,6 +228,27 @@ class LoggingConfig(google.protobuf.message.Message): ) -> None: ... def ClearField(self, field_name: typing_extensions.Literal["config", b"config", "kind", b"kind"]) -> None: ... + class CouchbaseColumnarDestination(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + DATABASE_FIELD_NUMBER: builtins.int + SCOPE_FIELD_NUMBER: builtins.int + COLLECTION_FIELD_NUMBER: builtins.int + database: builtins.str + """Destination database name""" + scope: builtins.str + """Destination scope name""" + collection: builtins.str + """Destination collection name""" + def __init__( + self, + *, + database: builtins.str = ..., + scope: builtins.str = ..., + collection: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["collection", b"collection", "database", b"database", "scope", b"scope"]) -> None: ... + SAMPLE_RATE_FIELD_NUMBER: builtins.int FILE_DESTINATION_FIELD_NUMBER: builtins.int BIGQUERY_DESTINATION_FIELD_NUMBER: builtins.int @@ -255,6 +256,7 @@ class LoggingConfig(google.protobuf.message.Message): SNOWFLAKE_DESTINATION_FIELD_NUMBER: builtins.int CUSTOM_DESTINATION_FIELD_NUMBER: builtins.int ATHENA_DESTINATION_FIELD_NUMBER: builtins.int + COUCHBASE_COLUMNAR_DESTINATION_FIELD_NUMBER: builtins.int sample_rate: builtins.float @property def file_destination(self) -> global___LoggingConfig.FileDestination: ... @@ -268,6 +270,8 @@ class LoggingConfig(google.protobuf.message.Message): def custom_destination(self) -> global___LoggingConfig.CustomDestination: ... @property def athena_destination(self) -> global___LoggingConfig.AthenaDestination: ... + @property + def couchbase_columnar_destination(self) -> global___LoggingConfig.CouchbaseColumnarDestination: ... def __init__( self, *, @@ -280,9 +284,9 @@ class LoggingConfig(google.protobuf.message.Message): athena_destination: global___LoggingConfig.AthenaDestination | None = ..., couchbase_columnar_destination: global___LoggingConfig.CouchbaseColumnarDestination | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["athena_destination", b"athena_destination", "bigquery_destination", b"bigquery_destination", "custom_destination", b"custom_destination", "destination", b"destination", "file_destination", b"file_destination", "redshift_destination", b"redshift_destination", "snowflake_destination", b"snowflake_destination"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["athena_destination", b"athena_destination", "bigquery_destination", b"bigquery_destination", "custom_destination", b"custom_destination", "destination", b"destination", "file_destination", b"file_destination", "redshift_destination", b"redshift_destination", "sample_rate", b"sample_rate", "snowflake_destination", b"snowflake_destination"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions.Literal["destination", b"destination"]) -> typing_extensions.Literal["file_destination", "bigquery_destination", "redshift_destination", "snowflake_destination", "custom_destination", "athena_destination"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["athena_destination", b"athena_destination", "bigquery_destination", b"bigquery_destination", "couchbase_columnar_destination", b"couchbase_columnar_destination", "custom_destination", b"custom_destination", "destination", b"destination", "file_destination", b"file_destination", "redshift_destination", b"redshift_destination", "snowflake_destination", b"snowflake_destination"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["athena_destination", b"athena_destination", "bigquery_destination", b"bigquery_destination", "couchbase_columnar_destination", b"couchbase_columnar_destination", "custom_destination", b"custom_destination", "destination", b"destination", "file_destination", b"file_destination", "redshift_destination", b"redshift_destination", "sample_rate", b"sample_rate", "snowflake_destination", b"snowflake_destination"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["destination", b"destination"]) -> typing_extensions.Literal["file_destination", "bigquery_destination", "redshift_destination", "snowflake_destination", "custom_destination", "athena_destination", "couchbase_columnar_destination"] | None: ... global___LoggingConfig = LoggingConfig From a28b8d2bcd288ad7a7c52a6e2574b93aa735fb84 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Thu, 6 Feb 2025 14:55:05 -0800 Subject: [PATCH 10/15] Make build-sphinx Signed-off-by: Elliot Scribner --- ...stores.contrib.couchbase_offline_store.rst | 37 +++++++++++++++++++ ....contrib.couchbase_offline_store.tests.rst | 21 +++++++++++ .../feast.infra.offline_stores.contrib.rst | 9 +++++ .../source/feast.infra.utils.couchbase.rst | 21 +++++++++++ sdk/python/docs/source/feast.infra.utils.rst | 1 + sdk/python/docs/source/feast.rst | 8 ++++ 6 files changed, 97 insertions(+) create mode 100644 sdk/python/docs/source/feast.infra.offline_stores.contrib.couchbase_offline_store.rst create mode 100644 sdk/python/docs/source/feast.infra.offline_stores.contrib.couchbase_offline_store.tests.rst create mode 100644 sdk/python/docs/source/feast.infra.utils.couchbase.rst diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.couchbase_offline_store.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.couchbase_offline_store.rst new file mode 100644 index 0000000000..7104b02bb6 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.couchbase_offline_store.rst @@ -0,0 +1,37 @@ +feast.infra.offline\_stores.contrib.couchbase\_offline\_store package +===================================================================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + feast.infra.offline_stores.contrib.couchbase_offline_store.tests + +Submodules +---------- + +feast.infra.offline\_stores.contrib.couchbase\_offline\_store.couchbase module +------------------------------------------------------------------------------ + +.. automodule:: feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase + :members: + :undoc-members: + :show-inheritance: + +feast.infra.offline\_stores.contrib.couchbase\_offline\_store.couchbase\_source module +-------------------------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase_source + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.offline_stores.contrib.couchbase_offline_store + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.couchbase_offline_store.tests.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.couchbase_offline_store.tests.rst new file mode 100644 index 0000000000..41566b5359 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.couchbase_offline_store.tests.rst @@ -0,0 +1,21 @@ +feast.infra.offline\_stores.contrib.couchbase\_offline\_store.tests package +=========================================================================== + +Submodules +---------- + +feast.infra.offline\_stores.contrib.couchbase\_offline\_store.tests.data\_source module +--------------------------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.couchbase_offline_store.tests.data_source + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.offline_stores.contrib.couchbase_offline_store.tests + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst index ec74ddab05..61e797bd6a 100644 --- a/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst @@ -8,6 +8,7 @@ Subpackages :maxdepth: 4 feast.infra.offline_stores.contrib.athena_offline_store + feast.infra.offline_stores.contrib.couchbase_offline_store feast.infra.offline_stores.contrib.mssql_offline_store feast.infra.offline_stores.contrib.postgres_offline_store feast.infra.offline_stores.contrib.spark_offline_store @@ -24,6 +25,14 @@ feast.infra.offline\_stores.contrib.athena\_repo\_configuration module :undoc-members: :show-inheritance: +feast.infra.offline\_stores.contrib.couchbase\_columnar\_repo\_configuration module +----------------------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.couchbase_columnar_repo_configuration + :members: + :undoc-members: + :show-inheritance: + feast.infra.offline\_stores.contrib.mssql\_repo\_configuration module --------------------------------------------------------------------- diff --git a/sdk/python/docs/source/feast.infra.utils.couchbase.rst b/sdk/python/docs/source/feast.infra.utils.couchbase.rst new file mode 100644 index 0000000000..d6d2025c42 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.utils.couchbase.rst @@ -0,0 +1,21 @@ +feast.infra.utils.couchbase package +=================================== + +Submodules +---------- + +feast.infra.utils.couchbase.couchbase\_utils module +--------------------------------------------------- + +.. automodule:: feast.infra.utils.couchbase.couchbase_utils + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.utils.couchbase + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.utils.rst b/sdk/python/docs/source/feast.infra.utils.rst index 083259bfaa..cfa82dc5fd 100644 --- a/sdk/python/docs/source/feast.infra.utils.rst +++ b/sdk/python/docs/source/feast.infra.utils.rst @@ -7,6 +7,7 @@ Subpackages .. toctree:: :maxdepth: 4 + feast.infra.utils.couchbase feast.infra.utils.postgres feast.infra.utils.snowflake diff --git a/sdk/python/docs/source/feast.rst b/sdk/python/docs/source/feast.rst index ea34c3d8dd..fdb91b2342 100644 --- a/sdk/python/docs/source/feast.rst +++ b/sdk/python/docs/source/feast.rst @@ -332,6 +332,14 @@ feast.saved\_dataset module :undoc-members: :show-inheritance: +feast.ssl\_ca\_trust\_store\_setup module +----------------------------------------- + +.. automodule:: feast.ssl_ca_trust_store_setup + :members: + :undoc-members: + :show-inheritance: + feast.stream\_feature\_view module ---------------------------------- From 58800990d4718b9b270e48da8dd5090e941486e6 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Thu, 6 Feb 2025 15:08:32 -0800 Subject: [PATCH 11/15] Add info on Columnar setup to docs Signed-off-by: Elliot Scribner --- docs/reference/offline-stores/couchbase.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/reference/offline-stores/couchbase.md b/docs/reference/offline-stores/couchbase.md index 75d437a3f5..97e1aaefd9 100644 --- a/docs/reference/offline-stores/couchbase.md +++ b/docs/reference/offline-stores/couchbase.md @@ -14,6 +14,16 @@ Please do not assume complete stability. In order to use this offline store, you'll need to run `pip install 'feast[couchbase]'`. You can get started by then running `feast init -t couchbase`. +To get started with Couchbase Capella Columnar: +1. Sign up for a [Couchbase Capella](https://cloud.couchbase.com/) account +2. [Deploy a Columnar cluster](https://docs.couchbase.com/columnar/admin/prepare-project.html) +3. [Create an Access Control Account](https://docs.couchbase.com/columnar/admin/auth/auth-data.html) + - This account should be able to read and write. + - For testing purposes, it is recommended to assign all roles to avoid any permission issues. +4. [Configure allowed IP addresses](https://docs.couchbase.com/columnar/admin/ip-allowed-list.html) + - You must allow the IP address of the machine running Feast. + + ## Example {% code title="feature_store.yaml" %} From 08f70646ee2c8dde0eb44df6625b8adb774db927 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Thu, 6 Feb 2025 18:48:06 -0800 Subject: [PATCH 12/15] Docs Adjustment Signed-off-by: Elliot Scribner --- docs/reference/offline-stores/couchbase.md | 6 +++--- .../templates/couchbase/feature_repo/feature_store.yaml | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/reference/offline-stores/couchbase.md b/docs/reference/offline-stores/couchbase.md index 97e1aaefd9..f14b760801 100644 --- a/docs/reference/offline-stores/couchbase.md +++ b/docs/reference/offline-stores/couchbase.md @@ -33,9 +33,9 @@ registry: data/registry.db provider: local offline_store: type: couchbase - connection_string: COUCHBASE_COLUMNAR_CONNECTION_STRING # Copied from 'Connect' page in Capella Columnar console, starts with couchbases:// - user: COUCHBASE_COLUMNAR_USER # Couchbase username from access credentials - password: COUCHBASE_COLUMNAR_PASSWORD # Couchbase password from access credentials + connection_string: COUCHBASE_COLUMNAR_CONNECTION_STRING # Copied from Settings > Connection String page in Capella Columnar console, starts with couchbases:// + user: COUCHBASE_COLUMNAR_USER # Couchbase cluster access name from Settings > Access Control page in Capella Columnar console + password: COUCHBASE_COLUMNAR_PASSWORD # Couchbase password from Settings > Access Control page in Capella Columnar console timeout: 120 # Timeout in seconds for Columnar operations, optional online_store: path: data/online_store.db diff --git a/sdk/python/feast/templates/couchbase/feature_repo/feature_store.yaml b/sdk/python/feast/templates/couchbase/feature_repo/feature_store.yaml index 913f17565a..02d47506aa 100644 --- a/sdk/python/feast/templates/couchbase/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/couchbase/feature_repo/feature_store.yaml @@ -10,8 +10,8 @@ online_store: kv_port: COUCHBASE_KV_PORT # Couchbase key-value port, defaults to 11210. Required if custom ports are used. offline_store: type: couchbase - connection_string: COUCHBASE_COLUMNAR_CONNECTION_STRING # Copied from 'Connect' page in Capella Columnar console, starts with couchbases:// user: COUCHBASE_USER # Couchbase username from database access credentials - user: COUCHBASE_COLUMNAR_USER # Couchbase cluster access name from access control - password: COUCHBASE_COLUMNAR_PASSWORD # Couchbase password from access control + connection_string: COUCHBASE_COLUMNAR_CONNECTION_STRING # Copied from Settings > Connection String page in Capella Columnar console, starts with couchbases:// + user: COUCHBASE_COLUMNAR_USER # Couchbase cluster access name from Settings > Access Control page in Capella Columnar console + password: COUCHBASE_COLUMNAR_PASSWORD # Couchbase password from Settings > Access Control page in Capella Columnar console timeout: COUCHBASE_COLUMNAR_TIMEOUT # Timeout in seconds for Columnar operations, optional entity_key_serialization_version: 2 From 440a0c3b63430f2047a62f2fa318ee4f12b799ec Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Thu, 6 Feb 2025 18:56:43 -0800 Subject: [PATCH 13/15] Lint Fix Signed-off-by: Elliot Scribner --- .../offline_stores/contrib/couchbase_offline_store/couchbase.py | 2 +- sdk/python/feast/templates/couchbase/bootstrap.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index 638ae6ffd9..2180ee21db 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -472,7 +472,7 @@ def build_point_in_time_query( final_output_feature_names.extend( [ ( - f'{fv["name"]}__{fv["field_mapping"].get(feature, feature)}' + f"{fv['name']}__{fv['field_mapping'].get(feature, feature)}" if full_feature_names else fv["field_mapping"].get(feature, feature) ) diff --git a/sdk/python/feast/templates/couchbase/bootstrap.py b/sdk/python/feast/templates/couchbase/bootstrap.py index f69a7bd593..4a13f5847a 100644 --- a/sdk/python/feast/templates/couchbase/bootstrap.py +++ b/sdk/python/feast/templates/couchbase/bootstrap.py @@ -1,8 +1,8 @@ import click from couchbase_columnar.cluster import Cluster from couchbase_columnar.common.errors import InvalidCredentialError, TimeoutError -from couchbase_columnar.common.options import QueryOptions from couchbase_columnar.credential import Credential +from couchbase_columnar.options import QueryOptions from feast.file_utils import replace_str_in_file from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase import ( From 3b7e8701c2f72e2e410e1488450215971115435a Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Thu, 6 Feb 2025 19:26:35 -0800 Subject: [PATCH 14/15] Dispatch Timeouts Signed-off-by: Elliot Scribner --- .../couchbase_offline_store/couchbase.py | 7 +++++-- .../couchbase_source.py | 11 ++++++++--- .../tests/data_source.py | 18 +++++++++++------- .../feast/templates/couchbase/bootstrap.py | 11 ++++++++--- 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index 2180ee21db..17ee8b7816 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -23,7 +23,7 @@ from couchbase_columnar.cluster import Cluster from couchbase_columnar.common.result import BlockingQueryResult from couchbase_columnar.credential import Credential -from couchbase_columnar.options import QueryOptions +from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions from jinja2 import BaseLoader, Environment from pydantic import StrictFloat, StrictStr @@ -348,7 +348,10 @@ def _get_columnar_cluster(config: CouchbaseColumnarOfflineStoreConfig) -> Cluste assert config.password is not None cred = Credential.from_username_and_password(config.user, config.password) - return Cluster.create_instance(config.connection_string, cred) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) + return Cluster.create_instance( + config.connection_string, cred, ClusterOptions(timeout_options=timeout_opts) + ) def _execute_query( diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py index a28f853da1..89e4aa2332 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py @@ -4,7 +4,7 @@ from couchbase_columnar.cluster import Cluster from couchbase_columnar.credential import Credential -from couchbase_columnar.options import QueryOptions +from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions from typeguard import typechecked from feast.data_source import DataSource @@ -203,7 +203,12 @@ def get_table_column_names_and_types( cred = Credential.from_username_and_password( config.offline_store.user, config.offline_store.password ) - cluster = Cluster.create_instance(config.offline_store.connection_string, cred) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) + cluster = Cluster.create_instance( + config.offline_store.connection_string, + cred, + ClusterOptions(timeout_options=timeout_opts), + ) query_context = self.get_table_query_string() query = f""" @@ -218,7 +223,7 @@ def get_table_column_names_and_types( """ result = cluster.execute_query( - query, QueryOptions(timeout=timedelta(seconds=500)) + query, QueryOptions(timeout=timedelta(seconds=config.offline_store.timeout)) ) if not result: raise ZeroColumnQueryResult(query) diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py index 413dc2e618..157e71fe49 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py @@ -7,7 +7,7 @@ import pandas as pd from couchbase_columnar.cluster import Cluster from couchbase_columnar.credential import Credential -from couchbase_columnar.options import QueryOptions +from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions from feast.data_source import DataSource from feast.feature_logging import LoggingDestination @@ -38,7 +38,7 @@ def __init__(self, project_name: str, *args, **kwargs): connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"], user=os.environ["COUCHBASE_COLUMNAR_USER"], password=os.environ["COUCHBASE_COLUMNAR_PASSWORD"], - timeout=480, + timeout=120, ) def create_data_source( @@ -67,8 +67,11 @@ def format_row(row): cred = Credential.from_username_and_password( self.offline_store_config.user, self.offline_store_config.password ) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) cluster = Cluster.create_instance( - self.offline_store_config.connection_string, cred + self.offline_store_config.connection_string, + cred, + ClusterOptions(timeout_options=timeout_opts), ) create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;" @@ -127,8 +130,12 @@ def teardown(self): cred = Credential.from_username_and_password( self.offline_store_config.user, self.offline_store_config.password ) + + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) cluster = Cluster.create_instance( - self.offline_store_config.connection_string, cred + self.offline_store_config.connection_string, + cred, + ClusterOptions(timeout_options=timeout_opts), ) for collection in self.collections: @@ -141,8 +148,5 @@ def teardown(self): ), ) print(f"Successfully dropped collection: {collection}") - except TimeoutError: - # FIXME: temp workaround, timeouts occur in Columnar SDK even when the drop was successful - pass except Exception as e: print(f"Error dropping collection {collection}: {e}") diff --git a/sdk/python/feast/templates/couchbase/bootstrap.py b/sdk/python/feast/templates/couchbase/bootstrap.py index 4a13f5847a..ce3f6442a5 100644 --- a/sdk/python/feast/templates/couchbase/bootstrap.py +++ b/sdk/python/feast/templates/couchbase/bootstrap.py @@ -2,7 +2,7 @@ from couchbase_columnar.cluster import Cluster from couchbase_columnar.common.errors import InvalidCredentialError, TimeoutError from couchbase_columnar.credential import Credential -from couchbase_columnar.options import QueryOptions +from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions from feast.file_utils import replace_str_in_file from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase import ( @@ -61,13 +61,18 @@ def bootstrap(): cred = Credential.from_username_and_password( columnar_user, columnar_password ) - cluster = Cluster.create_instance(columnar_connection_string, cred) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) + cluster = Cluster.create_instance( + columnar_connection_string, + cred, + ClusterOptions(timeout_options=timeout_opts), + ) table_name = "Default.Default.feast_driver_hourly_stats" try: cluster.execute_query( f"DROP COLLECTION {table_name} IF EXISTS", - QueryOptions(timeout=timedelta(seconds=500)), + QueryOptions(timeout=timedelta(seconds=columnar_timeout)), ) except TimeoutError: # FIXME: temp workaround, timeouts occur in Columnar SDK even when the drop was successful From 655f2d154a63503921c966dc7451b6e3034b1bd0 Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Fri, 7 Feb 2025 12:52:23 -0800 Subject: [PATCH 15/15] Cleanup Steps for Test Resources Signed-off-by: Elliot Scribner --- .../tests/data_source.py | 113 ++++++++++++++---- 1 file changed, 87 insertions(+), 26 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py index 157e71fe49..5fe96d2ef5 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py @@ -1,5 +1,8 @@ +import atexit import json import os +import signal +import threading import uuid from datetime import timedelta from typing import Dict, List, Optional @@ -29,10 +32,31 @@ class CouchbaseColumnarDataSourceCreator(DataSourceCreator): - collections: List[str] = [] + _shutting_down = False + _cluster = None + _cluster_lock = threading.Lock() + + @classmethod + def get_cluster(cls): + with cls._cluster_lock: + if cls._cluster is None: + cred = Credential.from_username_and_password( + os.environ["COUCHBASE_COLUMNAR_USER"], + os.environ["COUCHBASE_COLUMNAR_PASSWORD"], + ) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) + cls._cluster = Cluster.create_instance( + os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"], + cred, + ClusterOptions(timeout_options=timeout_opts), + ) + return cls._cluster - def __init__(self, project_name: str, *args, **kwargs): + def __init__(self, project_name: str, **kwargs): super().__init__(project_name) + self.project_name = project_name + self.collections: List[str] = [] + self.offline_store_config = CouchbaseColumnarOfflineStoreConfig( type="couchbase", connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"], @@ -64,18 +88,8 @@ def format_row(row): collection_name = self.get_prefixed_collection_name(destination_name) - cred = Credential.from_username_and_password( - self.offline_store_config.user, self.offline_store_config.password - ) - timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) - cluster = Cluster.create_instance( - self.offline_store_config.connection_string, - cred, - ClusterOptions(timeout_options=timeout_opts), - ) - create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;" - cluster.execute_query( + self.get_cluster().execute_query( create_cluster_query, QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)), ) @@ -88,7 +102,7 @@ def format_row(row): {values_clause} ]) """ - cluster.execute_query( + self.get_cluster().execute_query( insert_query, QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)), ) @@ -126,22 +140,52 @@ def create_offline_store_config(self) -> FeastConfigBaseModel: def get_prefixed_collection_name(self, suffix: str) -> str: return f"{self.project_name}_{suffix}" - def teardown(self): - cred = Credential.from_username_and_password( - self.offline_store_config.user, self.offline_store_config.password - ) - - timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) - cluster = Cluster.create_instance( - self.offline_store_config.connection_string, - cred, - ClusterOptions(timeout_options=timeout_opts), - ) + @classmethod + def get_dangling_collections(cls) -> List[str]: + query = """ + SELECT VALUE d.DatabaseName || '.' || d.DataverseName || '.' || d.DatasetName + FROM System.Metadata.`Dataset` d + WHERE d.DataverseName <> "Metadata" + AND (REGEXP_CONTAINS(d.DatasetName, "integration_test_.*") + OR REGEXP_CONTAINS(d.DatasetName, "feast_entity_df_.*")); + """ + try: + res = cls.get_cluster().execute_query(query) + return res.get_all_rows() + except Exception as e: + print(f"Error fetching collections: {e}") + return [] + + @classmethod + def cleanup_all(cls): + if cls._shutting_down: + return + cls._shutting_down = True + try: + collections = cls.get_dangling_collections() + if len(collections) == 0: + print("No collections to clean up.") + return + + print(f"Found {len(collections)} collections to clean up.") + if len(collections) > 5: + print("This may take a few minutes...") + for collection in collections: + try: + query = f"DROP COLLECTION {collection} IF EXISTS;" + cls.get_cluster().execute_query(query) + print(f"Dropped collection: {collection}") + except Exception as e: + print(f"Error dropping collection {collection}: {e}") + finally: + print("Cleanup complete.") + cls._shutting_down = False + def teardown(self): for collection in self.collections: query = f"DROP COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.`{collection}` IF EXISTS;" try: - cluster.execute_query( + self.get_cluster().execute_query( query, QueryOptions( timeout=timedelta(seconds=self.offline_store_config.timeout) @@ -150,3 +194,20 @@ def teardown(self): print(f"Successfully dropped collection: {collection}") except Exception as e: print(f"Error dropping collection {collection}: {e}") + + +def cleanup_handler(signum, frame): + print("\nCleaning up dangling resources...") + try: + CouchbaseColumnarDataSourceCreator.cleanup_all() + except Exception as e: + print(f"Error during cleanup: {e}") + finally: + # Re-raise the signal to properly exit + signal.default_int_handler(signum, frame) + + +# Register both SIGINT and SIGTERM handlers +signal.signal(signal.SIGINT, cleanup_handler) +signal.signal(signal.SIGTERM, cleanup_handler) +atexit.register(CouchbaseColumnarDataSourceCreator.cleanup_all)