From 4a5bc09d78b48504b255428a2b6bd86470a10d04 Mon Sep 17 00:00:00 2001 From: Andon Andonov Date: Tue, 1 Feb 2022 15:44:41 +0200 Subject: [PATCH] [vdk-plugins] Update ingestion interfaces used in plugins As part of https://github.com/vmware/versatile-data-kit/pull/682, the signature of the ingest_payload() method was updated to accept a metadata object as part of the ingestion flow. Once the interface change is adopted in vdk-core, this would cause data jobs to fail, due to missing keyword argument "metadata". This change updates the signatures of the ingest_payload() methods, implemented in the vdk-plugins that provide ingestion capabilities. This is done in order to avoid the abovementioned error. Testing Done: Not required, as no plugins currently process the metadata. Signed-off-by: Andon Andonov --- .../vdk-core/tests/functional/run/test_run_ingest.py | 1 + .../src/vdk/plugin/greenplum/ingest_to_greenplum.py | 1 + .../src/vdk/plugin/ingest_file/ingestion_to_file.py | 4 ++++ .../src/vdk/plugin/ingest_http/ingest_over_http.py | 1 + .../src/vdk/plugin/sqlite/ingest_to_sqlite.py | 10 ++++++++-- .../src/vdk/plugin/test_utils/util_plugins.py | 1 + .../vdk-trino/src/vdk/plugin/trino/ingest_to_trino.py | 7 ++++++- 7 files changed, 22 insertions(+), 3 deletions(-) diff --git a/projects/vdk-core/tests/functional/run/test_run_ingest.py b/projects/vdk-core/tests/functional/run/test_run_ingest.py index fa9b379878..659bbe5b88 100644 --- a/projects/vdk-core/tests/functional/run/test_run_ingest.py +++ b/projects/vdk-core/tests/functional/run/test_run_ingest.py @@ -17,6 +17,7 @@ def ingest_payload( destination_table: Optional[str], target: Optional[str] = None, collection_id: Optional[str] = None, + metadata: Optional[IngestIntoMemoryPlugin.IngestionMetadata] = None, ): raise IndexError("Random error from our plugin") diff --git a/projects/vdk-plugins/vdk-greenplum/src/vdk/plugin/greenplum/ingest_to_greenplum.py b/projects/vdk-plugins/vdk-greenplum/src/vdk/plugin/greenplum/ingest_to_greenplum.py index e305575c35..c5d93ef419 100644 --- a/projects/vdk-plugins/vdk-greenplum/src/vdk/plugin/greenplum/ingest_to_greenplum.py +++ b/projects/vdk-plugins/vdk-greenplum/src/vdk/plugin/greenplum/ingest_to_greenplum.py @@ -28,6 +28,7 @@ def ingest_payload( destination_table: Optional[str], target: Optional[str] = None, collection_id: Optional[str] = None, + metadata: Optional[IIngesterPlugin.IngestionMetadata] = None, ) -> None: """ See parent class doc for details diff --git a/projects/vdk-plugins/vdk-ingest-file/src/vdk/plugin/ingest_file/ingestion_to_file.py b/projects/vdk-plugins/vdk-ingest-file/src/vdk/plugin/ingest_file/ingestion_to_file.py index cff227ee46..00794d3f56 100644 --- a/projects/vdk-plugins/vdk-ingest-file/src/vdk/plugin/ingest_file/ingestion_to_file.py +++ b/projects/vdk-plugins/vdk-ingest-file/src/vdk/plugin/ingest_file/ingestion_to_file.py @@ -23,6 +23,7 @@ def ingest_payload( destination_table: Optional[str], target: Optional[str] = None, collection_id: Optional[str] = None, + metadata: Optional[IIngesterPlugin.IngestionMetadata] = None, ): """ Ingest payload to file. @@ -37,6 +38,9 @@ def ingest_payload( will be ingested. :param collection_id: Optional[string] Optional argument. Currently not used. + :param metadata: + an IngestionMetadata object that contains metadata about the + pre-ingestion and ingestion operations """ json_object = None diff --git a/projects/vdk-plugins/vdk-ingest-http/src/vdk/plugin/ingest_http/ingest_over_http.py b/projects/vdk-plugins/vdk-ingest-http/src/vdk/plugin/ingest_http/ingest_over_http.py index 170215c8d9..c7ead6f237 100644 --- a/projects/vdk-plugins/vdk-ingest-http/src/vdk/plugin/ingest_http/ingest_over_http.py +++ b/projects/vdk-plugins/vdk-ingest-http/src/vdk/plugin/ingest_http/ingest_over_http.py @@ -39,6 +39,7 @@ def ingest_payload( destination_table: Optional[str] = None, target: Optional[str] = None, collection_id: Optional[str] = None, + metadata: Optional[IIngesterPlugin.IngestionMetadata] = None, ) -> Optional[IngestionResult]: header = {"Content-Type": "application/octet-stream"} # TODO: configurable diff --git a/projects/vdk-plugins/vdk-sqlite/src/vdk/plugin/sqlite/ingest_to_sqlite.py b/projects/vdk-plugins/vdk-sqlite/src/vdk/plugin/sqlite/ingest_to_sqlite.py index b99a96595f..28efc35a3d 100644 --- a/projects/vdk-plugins/vdk-sqlite/src/vdk/plugin/sqlite/ingest_to_sqlite.py +++ b/projects/vdk-plugins/vdk-sqlite/src/vdk/plugin/sqlite/ingest_to_sqlite.py @@ -34,6 +34,7 @@ def ingest_payload( destination_table: Optional[str] = None, target: str = None, collection_id: Optional[str] = None, + metadata: Optional[IIngesterPlugin.IngestionMetadata] = None, ) -> None: """ Performs the ingestion @@ -43,9 +44,14 @@ def ingest_payload( :param destination_table: the name of the table receiving the payload in the target database :param target: - the path to the database file; if left None, defaults to VDK_INGEST_TARGET_DEFAULT + the path to the database file; if left None, defaults to + VDK_INGEST_TARGET_DEFAULT :param collection_id: - an identifier specifying that data from different method invocations belongs to the same collection + an identifier specifying that data from different method + invocations belongs to the same collection + :param metadata: + an IngestionMetadata object that contains metadata about the + pre-ingestion and ingestion operations """ target = target or self.conf.get_sqlite_file() if not target: diff --git a/projects/vdk-plugins/vdk-test-utils/src/vdk/plugin/test_utils/util_plugins.py b/projects/vdk-plugins/vdk-test-utils/src/vdk/plugin/test_utils/util_plugins.py index 27c2770c09..326a80938e 100644 --- a/projects/vdk-plugins/vdk-test-utils/src/vdk/plugin/test_utils/util_plugins.py +++ b/projects/vdk-plugins/vdk-test-utils/src/vdk/plugin/test_utils/util_plugins.py @@ -153,6 +153,7 @@ def ingest_payload( destination_table: Optional[str], target: Optional[str] = None, collection_id: Optional[str] = None, + metadata: Optional[IIngesterPlugin.IngestionMetadata] = None, ): self.payloads.append( IngestIntoMemoryPlugin.Payload( diff --git a/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/ingest_to_trino.py b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/ingest_to_trino.py index dc105f1114..0f92ad655e 100644 --- a/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/ingest_to_trino.py +++ b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/ingest_to_trino.py @@ -31,6 +31,7 @@ def ingest_payload( destination_table: Optional[str] = None, target: str = None, collection_id: Optional[str] = None, + metadata: Optional[IIngesterPlugin.IngestionMetadata] = None, ) -> None: """ Performs the ingestion @@ -42,7 +43,11 @@ def ingest_payload( this parameter is currently unused TODO: figure out what to use target for :param collection_id: - an identifier specifying that data from different method invocations belongs to the same collection + an identifier specifying that data from different method + invocations belongs to the same collection + :param metadata: + an IngestionMetadata object that contains metadata about the + pre-ingestion and ingestion operations """ log.info(