diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 4851aecae2..d973844531 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -351,3 +351,17 @@ def offline_write_batch( to show progress. """ raise NotImplementedError + + @staticmethod + def validate_data_source( + config: RepoConfig, + data_source: DataSource, + ): + """ + Validates the underlying data source. + + Args: + config: Configuration object used to configure a feature store. + data_source: DataSource object that needs to be validated + """ + data_source.validate(config=config) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 48d2f8ef18..b96c3433ea 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -7,6 +7,7 @@ from feast import importer from feast.batch_feature_view import BatchFeatureView +from feast.data_source import DataSource from feast.entity import Entity from feast.feature_logging import FeatureServiceLoggingSource from feast.feature_service import FeatureService @@ -383,3 +384,10 @@ def retrieve_feature_service_logs( start_date=make_tzaware(start_date), end_date=make_tzaware(end_date), ) + + def validate_data_source( + self, + config: RepoConfig, + data_source: DataSource, + ): + self.offline_store.validate_data_source(config=config, data_source=data_source) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 22f6088474..93077f40b9 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -8,6 +8,7 @@ from tqdm import tqdm from feast import FeatureService, errors +from feast.data_source import DataSource from feast.entity import Entity from feast.feature_view import FeatureView from feast.importer import import_class @@ -352,6 +353,21 @@ def retrieve_online_documents( """ pass + @abstractmethod + def validate_data_source( + self, + config: RepoConfig, + data_source: DataSource, + ): + """ + Validates the underlying data source. + + Args: + config: Configuration object used to configure a feature store. + data_source: DataSource object that needs to be validated + """ + pass + def get_provider(config: RepoConfig) -> Provider: if "." not in config.provider: diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 0b659b960c..296155ce46 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -205,10 +205,11 @@ def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool) project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path) if not skip_source_validation: + provider = store._get_provider() data_sources = [t.batch_source for t in repo.feature_views] # Make sure the data source used by this feature view is supported by Feast for data_source in data_sources: - data_source.validate(store.config) + provider.validate_data_source(store.config, data_source) registry_diff, infra_diff, _ = store.plan(repo) click.echo(registry_diff.to_string()) @@ -282,10 +283,11 @@ def apply_total_with_repo_instance( skip_source_validation: bool, ): if not skip_source_validation: + provider = store._get_provider() data_sources = [t.batch_source for t in repo.feature_views] # Make sure the data source used by this feature view is supported by Feast for data_source in data_sources: - data_source.validate(store.config) + provider.validate_data_source(store.config, data_source) # For each object in the registry, determine whether it should be kept or deleted. ( diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 3b9146a7b9..bd1e247a7b 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -7,6 +7,7 @@ from tqdm import tqdm from feast import Entity, FeatureService, FeatureView, RepoConfig +from feast.data_source import DataSource from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.provider import Provider from feast.infra.registry.base_registry import BaseRegistry @@ -130,3 +131,10 @@ def retrieve_online_documents( ] ]: return [] + + def validate_data_source( + self, + config: RepoConfig, + data_source: DataSource, + ): + pass