From bb0a4fba46dce41d5a7c0abcf305eac26d41dc90 Mon Sep 17 00:00:00 2001 From: robhowley Date: Fri, 31 May 2024 09:12:13 -0400 Subject: [PATCH 1/4] add aiobotocore and online_read_async to dynamo read Signed-off-by: robhowley --- .../feast/infra/online_stores/dynamodb.py | 128 +++++++++++++----- setup.py | 2 +- 2 files changed, 95 insertions(+), 35 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 0ee9af185d..e3cfa2aa85 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -35,6 +35,7 @@ import boto3 from botocore.config import Config from botocore.exceptions import ClientError + from aiobotocore import session except ImportError as e: from feast.errors import FeastExtrasDependencyImportError @@ -80,6 +81,7 @@ class DynamoDBOnlineStore(OnlineStore): _dynamodb_client = None _dynamodb_resource = None + _aioboto_session = None def update( self, @@ -206,38 +208,9 @@ def online_write_batch( ) self._write_batch_non_duplicates(table_instance, data, progress, config) - def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None, - ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - """ - Retrieve feature values from the online DynamoDB store. - - Args: - config: The RepoConfig for the current FeatureStore. - table: Feast FeatureView. - entity_keys: a list of entity keys that should be read from the FeatureStore. - """ - online_config = config.online_store - assert isinstance(online_config, DynamoDBOnlineStoreConfig) - dynamodb_resource = self._get_dynamodb_resource( - online_config.region, online_config.endpoint_url - ) - table_instance = dynamodb_resource.Table( - _get_table_name(online_config, config, table) - ) - + def _read_batches(self, online_config, entity_ids, table_name, batch_get_item) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - entity_ids = [ - compute_entity_id( - entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ) - for entity_key in entity_keys - ] + batch_size = online_config.batch_size entity_ids_iter = iter(entity_ids) while True: @@ -249,16 +222,16 @@ def online_read( if len(batch) == 0: break batch_entity_ids = { - table_instance.name: { + table_name: { "Keys": [{"entity_id": entity_id} for entity_id in batch], "ConsistentRead": online_config.consistent_reads, } } - response = dynamodb_resource.batch_get_item( + response = batch_get_item( RequestItems=batch_entity_ids, ) response = response.get("Responses") - table_responses = response.get(table_instance.name) + table_responses = response.get(table_name) if table_responses: table_responses = self._sort_dynamodb_response( table_responses, entity_ids @@ -286,6 +259,93 @@ def online_read( result.extend(batch_result) return result + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Retrieve feature values from the online DynamoDB store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + entity_keys: a list of entity keys that should be read from the FeatureStore. + """ + online_config = config.online_store + assert isinstance(online_config, DynamoDBOnlineStoreConfig) + dynamodb_resource = self._get_dynamodb_resource( + online_config.region, online_config.endpoint_url + ) + table_instance = dynamodb_resource.Table( + _get_table_name(online_config, config, table) + ) + + entity_ids = [ + compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + for entity_key in entity_keys + ] + + return self._read_batches( + online_config, + entity_ids, + table_instance.name, + dynamodb_resource.batch_get_item, + ) + + async def online_read_async( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Reads features values for the given entity keys asynchronously. + + Args: + config: The config for the current feature store. + table: The feature view whose feature values should be read. + entity_keys: The list of entity keys for which feature values should be read. + requested_features: The list of features that should be read. + + Returns: + A list of the same length as entity_keys. Each item in the list is a tuple where the first + item is the event timestamp for the row, and the second item is a dict mapping feature names + to values, which are returned in proto format. + """ + online_config = config.online_store + assert isinstance(online_config, DynamoDBOnlineStoreConfig) + + entity_ids = [ + compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + for entity_key in entity_keys + ] + + async with self._get_aiodynamodb_client(online_config.region) as client: + return self._read_batches( + online_config, + entity_ids, + _get_table_name(online_config, config, table), + client.batch_get_item + ) + + def _get_aioboto_session(self): + if self._aioboto_session is None: + self._aioboto_session = session.get_session() + return self._aioboto_session + + def _get_aiodynamodb_client(self, region: str): + return self._get_aioboto_session().create_client("dynamodb", region_name=region) + def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None): if self._dynamodb_client is None: self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url) diff --git a/setup.py b/setup.py index cdab69b684..b250f7c362 100644 --- a/setup.py +++ b/setup.py @@ -84,7 +84,7 @@ "hiredis>=2.0.0,<3", ] -AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "fsspec<=2024.1.0"] +AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "fsspec<=2024.1.0", "aiobotocore>=2.13.0"] KUBERNETES_REQUIRED = ["kubernetes<=20.13.0"] From 6b178bbbf2ad5942583268a7098446ddcabc6753 Mon Sep 17 00:00:00 2001 From: robhowley Date: Fri, 31 May 2024 09:13:21 -0400 Subject: [PATCH 2/4] format Signed-off-by: robhowley --- sdk/python/feast/infra/online_stores/dynamodb.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index e3cfa2aa85..0b6bd1bd8b 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -33,9 +33,9 @@ try: import boto3 + from aiobotocore import session from botocore.config import Config from botocore.exceptions import ClientError - from aiobotocore import session except ImportError as e: from feast.errors import FeastExtrasDependencyImportError @@ -208,7 +208,9 @@ def online_write_batch( ) self._write_batch_non_duplicates(table_instance, data, progress, config) - def _read_batches(self, online_config, entity_ids, table_name, batch_get_item) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + def _read_batches( + self, online_config, entity_ids, table_name, batch_get_item + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] batch_size = online_config.batch_size @@ -335,7 +337,7 @@ async def online_read_async( online_config, entity_ids, _get_table_name(online_config, config, table), - client.batch_get_item + client.batch_get_item, ) def _get_aioboto_session(self): From a19d46dd9d07cfd25e6c4dc659a87499bf52f4f0 Mon Sep 17 00:00:00 2001 From: robhowley Date: Fri, 31 May 2024 09:21:18 -0400 Subject: [PATCH 3/4] update test to run on dynamo Signed-off-by: robhowley --- .../tests/integration/online_store/test_universal_online.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 4822a8d4f7..4cb474d2f1 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -476,7 +476,7 @@ def test_online_retrieval_with_event_timestamps(environment, universal_data_sour @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["redis"]) +@pytest.mark.universal_online_stores(only=["redis", "dynamodb"]) def test_async_online_retrieval_with_event_timestamps( environment, universal_data_sources ): From d7a982f40566d2c4987b8e92a669bfbc16d3191b Mon Sep 17 00:00:00 2001 From: robhowley Date: Fri, 31 May 2024 09:24:36 -0400 Subject: [PATCH 4/4] duck type the table Signed-off-by: robhowley --- sdk/python/feast/infra/online_stores/dynamodb.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 0b6bd1bd8b..7c92cdafc7 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -209,7 +209,7 @@ def online_write_batch( self._write_batch_non_duplicates(table_instance, data, progress, config) def _read_batches( - self, online_config, entity_ids, table_name, batch_get_item + self, online_config, entity_ids, table_name, table_client ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] @@ -229,7 +229,7 @@ def _read_batches( "ConsistentRead": online_config.consistent_reads, } } - response = batch_get_item( + response = table_client.batch_get_item( RequestItems=batch_entity_ids, ) response = response.get("Responses") @@ -297,7 +297,7 @@ def online_read( online_config, entity_ids, table_instance.name, - dynamodb_resource.batch_get_item, + dynamodb_resource, ) async def online_read_async( @@ -337,7 +337,7 @@ async def online_read_async( online_config, entity_ids, _get_table_name(online_config, config, table), - client.batch_get_item, + client, ) def _get_aioboto_session(self):