From 0570e6a1a0d2e33f87d126f8f2df6ae4d2bb8db4 Mon Sep 17 00:00:00 2001 From: Andon Andonov Date: Fri, 2 Feb 2024 11:52:31 +0200 Subject: [PATCH] vdk-core: Adopt 'method' argument in pre-process plugins As part of https://github.com/vmware/versatile-data-kit/commit/03dde4524f33285647e48dd59ea287dbad458c79, the `pre_ingest_process` and `post_ingest_process` hooks of the IIngesterPlugin were updated to pass the ingest method to user-implemented ingestion plugins. This change updates the IngesterBase to pass the `method` argument from the ingested payloads to the `pre_ingest_process` and `post_ingest_process` hooks of the registered ingester plugins. Testing Done: Updated tests Signed-off-by: Andon Andonov --- .../builtin_plugins/ingestion/ingester_base.py | 15 ++++++++++++++- .../ingestion/test_ingester_base.py | 4 ++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py index 1a876a321f..c4bb0989e1 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py @@ -437,9 +437,16 @@ def _payload_poster_thread(self): destination_table: Optional[str] = None target: Optional[str] = None collection_id: Optional[str] = None + method: Optional[str] = None try: payload = self._payloads_queue.get() - payload_obj, destination_table, _, target, collection_id = payload + ( + payload_obj, + destination_table, + method, + target, + collection_id, + ) = payload # If there are any pre-processors set, pass the payload object # through them. @@ -450,6 +457,7 @@ def _payload_poster_thread(self): target=target, collection_id=collection_id, metadata=ingestion_metadata, + method=method, ) # Verify payload after pre-processing it, since this preprocessing might be responsible for @@ -512,6 +520,7 @@ def _payload_poster_thread(self): collection_id=collection_id, metadata=ingestion_metadata, exception=exception, + method=method, ) except Exception as e: resolvable_by = errors.get_exception_resolvable_by(e) @@ -575,6 +584,7 @@ def _pre_process_payload( target: Optional[str], collection_id: Optional[str], metadata: Optional[IIngesterPlugin.IngestionMetadata], + method: Optional[str], ) -> Tuple[List[dict], Optional[IIngesterPlugin.IngestionMetadata]]: for plugin in self._pre_processors: try: @@ -584,6 +594,7 @@ def _pre_process_payload( target=target, collection_id=collection_id, metadata=metadata, + method=method, ) except Exception as e: raise PreProcessPayloadIngestionException( @@ -611,6 +622,7 @@ def _execute_post_process_operations( collection_id: Optional[str], metadata: Optional[IIngesterPlugin.IngestionMetadata], exception: Optional[Exception], + method: Optional[str], ): for plugin in self._post_processors: try: @@ -621,6 +633,7 @@ def _execute_post_process_operations( collection_id=collection_id, metadata=metadata, exception=exception, + method=method, ) except Exception as e: raise PostProcessPayloadIngestionException( diff --git a/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py b/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py index 1c9951917c..67c934ba6d 100644 --- a/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py +++ b/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py @@ -245,6 +245,7 @@ def test_pre_ingestion_operation(): target=shared_test_values.get("target"), collection_id=shared_test_values.get("collection_id"), metadata=None, + method=shared_test_values.get("method"), ) ingester_base._ingester.ingest_payload.assert_called_once() @@ -276,6 +277,7 @@ def test_pre_ingestion_updated_dynamic_params(): target=shared_test_values.get("target"), collection_id=shared_test_values.get("collection_id"), metadata=None, + method=shared_test_values.get("method"), ) ingester_base._ingester.ingest_payload.assert_called_with( collection_id=shared_test_values.get("collection_id"), @@ -335,6 +337,7 @@ def test_ingest_payload_and_post_ingestion_operation(): collection_id=shared_test_values.get("collection_id"), metadata=test_ingestion_metadata, exception=None, + method=shared_test_values.get("method"), ) @@ -361,4 +364,5 @@ def test_post_ingestion_operation_with_exceptions(): collection_id=shared_test_values.get("collection_id"), metadata=None, exception=test_exception, + method=shared_test_values.get("method"), )