From 0e4215060f97b7629015ab65ac526dfef0a1f7d4 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 14 May 2024 09:25:54 -0700 Subject: [PATCH] feat: Feast/IKV upgrade client version (#4200) --- .../contrib/ikv_online_store/ikv.py | 34 ++++++++++++------- setup.py | 2 +- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 9d888aad3d..90df7f4686 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -82,9 +82,8 @@ def online_write_batch( progress: Function to be called once a batch of rows is written to the online store, used to show progress. """ - # update should have been called before - if self._writer is None: - return + self._init_writer(config=config) + assert self._writer is not None for entity_key, features, event_timestamp, _ in data: entity_id: str = compute_entity_id( @@ -120,6 +119,8 @@ def online_read( item is the event timestamp for the row, and the second item is a dict mapping feature names to values, which are returned in proto format. """ + self._init_reader(config=config) + if not len(entity_keys): return [] @@ -174,7 +175,6 @@ def _decode_fields_for_primary_key( return dt, features - # called before any read/write requests are issued @log_exceptions_and_usage(online_store="ikv") def update( self, @@ -199,7 +199,7 @@ def update( partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so infrastructure corresponding to other feature views should be not be touched. """ - self._init_clients(config=config) + self._init_writer(config=config) assert self._writer is not None # note: we assume tables_to_keep does not overlap with tables_to_delete @@ -223,7 +223,7 @@ def teardown( tables: Feature views whose corresponding infrastructure should be deleted. entities: Entities whose corresponding infrastructure should be deleted. """ - self._init_clients(config=config) + self._init_writer(config=config) assert self._writer is not None # drop fields corresponding to this feature-view @@ -269,20 +269,28 @@ def _create_document( return builder.build() - def _init_clients(self, config: RepoConfig): - """Initializes (if required) reader/writer ikv clients.""" - online_config = config.online_store - assert isinstance(online_config, IKVOnlineStoreConfig) - client_options = IKVOnlineStore._config_to_client_options(online_config) - + def _init_writer(self, config: RepoConfig): + """Initializes ikv writer client.""" # initialize writer if self._writer is None: + online_config = config.online_store + assert isinstance(online_config, IKVOnlineStoreConfig) + client_options = IKVOnlineStore._config_to_client_options(online_config) + self._writer = create_new_writer(client_options) + self._writer.startup() # blocking operation - # initialize reader, iff mount_dir is specified + def _init_reader(self, config: RepoConfig): + """Initializes ikv reader client.""" + # initialize reader if self._reader is None: + online_config = config.online_store + assert isinstance(online_config, IKVOnlineStoreConfig) + client_options = IKVOnlineStore._config_to_client_options(online_config) + if online_config.mount_directory and len(online_config.mount_directory) > 0: self._reader = create_new_reader(client_options) + self._reader.startup() # blocking operation @staticmethod def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions: diff --git a/setup.py b/setup.py index 9181e64c2f..cdab69b684 100644 --- a/setup.py +++ b/setup.py @@ -127,7 +127,7 @@ ] IKV_REQUIRED = [ - "ikvpy>=0.0.23", + "ikvpy>=0.0.36", ] HAZELCAST_REQUIRED = [