Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: online_read_async for dynamodb #5

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 95 additions & 33 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

try:
import boto3
from aiobotocore import session
from botocore.config import Config
from botocore.exceptions import ClientError
except ImportError as e:
Expand Down Expand Up @@ -80,6 +81,7 @@ class DynamoDBOnlineStore(OnlineStore):

_dynamodb_client = None
_dynamodb_resource = None
_aioboto_session = None

def update(
self,
Expand Down Expand Up @@ -206,38 +208,11 @@ def online_write_batch(
)
self._write_batch_non_duplicates(table_instance, data, progress, config)

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
def _read_batches(
self, online_config, entity_ids, table_name, table_client
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.

Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

batch_size = online_config.batch_size
entity_ids_iter = iter(entity_ids)
while True:
Expand All @@ -249,16 +224,16 @@ def online_read(
if len(batch) == 0:
break
batch_entity_ids = {
table_instance.name: {
table_name: {
"Keys": [{"entity_id": entity_id} for entity_id in batch],
"ConsistentRead": online_config.consistent_reads,
}
}
response = dynamodb_resource.batch_get_item(
response = table_client.batch_get_item(
RequestItems=batch_entity_ids,
)
response = response.get("Responses")
table_responses = response.get(table_instance.name)
table_responses = response.get(table_name)
if table_responses:
table_responses = self._sort_dynamodb_response(
table_responses, entity_ids
Expand Down Expand Up @@ -286,6 +261,93 @@ def online_read(
result.extend(batch_result)
return result

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.

Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)

entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

return self._read_batches(
online_config,
entity_ids,
table_instance.name,
dynamodb_resource,
)

async def online_read_async(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Reads features values for the given entity keys asynchronously.

Args:
config: The config for the current feature store.
table: The feature view whose feature values should be read.
entity_keys: The list of entity keys for which feature values should be read.
requested_features: The list of features that should be read.

Returns:
A list of the same length as entity_keys. Each item in the list is a tuple where the first
item is the event timestamp for the row, and the second item is a dict mapping feature names
to values, which are returned in proto format.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)

entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

async with self._get_aiodynamodb_client(online_config.region) as client:
return self._read_batches(
online_config,
entity_ids,
_get_table_name(online_config, config, table),
client,
)

def _get_aioboto_session(self):
if self._aioboto_session is None:
self._aioboto_session = session.get_session()
return self._aioboto_session

def _get_aiodynamodb_client(self, region: str):
return self._get_aioboto_session().create_client("dynamodb", region_name=region)

def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def test_online_retrieval_with_event_timestamps(environment, universal_data_sour


@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["redis"])
@pytest.mark.universal_online_stores(only=["redis", "dynamodb"])
def test_async_online_retrieval_with_event_timestamps(
environment, universal_data_sources
):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
"hiredis>=2.0.0,<3",
]

AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "fsspec<=2024.1.0"]
AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "fsspec<=2024.1.0", "aiobotocore>=2.13.0"]

KUBERNETES_REQUIRED = ["kubernetes<=20.13.0"]

Expand Down