From 6f12670422d7e6f2a87c3186c0c4498d0de7a943 Mon Sep 17 00:00:00 2001 From: Andon Andonov Date: Sun, 30 Jan 2022 20:17:44 +0200 Subject: [PATCH] vdk-core: Address feedback on interface changes Signed-off-by: Andon Andonov --- projects/vdk-core/cicd/build.sh | 7 ++- .../src/vdk/api/plugin/plugin_input.py | 62 ++++++++++++------- .../ingestion/ingester_base.py | 14 ++--- .../ingestion/test_ingester_base.py | 12 ++-- 4 files changed, 56 insertions(+), 39 deletions(-) diff --git a/projects/vdk-core/cicd/build.sh b/projects/vdk-core/cicd/build.sh index 86b1f7e444..fe7dcbc9a9 100755 --- a/projects/vdk-core/cicd/build.sh +++ b/projects/vdk-core/cicd/build.sh @@ -20,8 +20,11 @@ cd .. export PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL:-https://test.pypi.org/simple/} -echo "Update pip to newest version" -pip install -U pip +# Below lines are commented out to unblock CI/CD until latest pip no longer +# causes installation issues. +# TODO: uncomment once latest pip starts working as expected. +#echo "Update pip to newest version" +#pip install -U pip echo "install dependencies from requirements.txt (used for development and testing)" pip install --extra-index-url $PIP_EXTRA_INDEX_URL -r requirements.txt diff --git a/projects/vdk-core/src/vdk/api/plugin/plugin_input.py b/projects/vdk-core/src/vdk/api/plugin/plugin_input.py index 688e258460..2b619f6b56 100644 --- a/projects/vdk-core/src/vdk/api/plugin/plugin_input.py +++ b/projects/vdk-core/src/vdk/api/plugin/plugin_input.py @@ -231,7 +231,6 @@ class IIngesterPlugin: """ IngestionMetadata = NewType("IngestionMetadata", Dict) - ExceptionsList = NewType("ExceptionsList", List) def ingest_payload( self, @@ -239,14 +238,14 @@ def ingest_payload( destination_table: Optional[str], target: Optional[str] = None, collection_id: Optional[str] = None, + metadata: Optional[IngestionMetadata] = None, ) -> Optional[IngestionMetadata]: """ Do the actual ingestion of the payload :param payload: List[dict] - The payloads to be ingested. Depending on the number of payloads - to be processed, there might be 0 or many dict objects. Each dict - object is a separate payload. + The payload to be ingested, split into 1 or many dictionary + objects. Note: The memory size of the list is dependent on the payload_size_bytes_threshold attribute. :param destination_table: Optional[string] @@ -262,7 +261,7 @@ def ingest_payload( Example: http://example.com// This parameter does not need to be used, in case the - `INGEST_TARGET_DEFAULT` environment variable is set. This can be + `INGEST_TARGET_DEFAULT` configuration variable is set. This can be made by plugins, which may set default value, or it can be overwritten by users. :param collection_id: string @@ -270,6 +269,11 @@ def ingest_payload( method invocations belong to same collection. Defaults to "data_job_name|OpID", meaning all method invocations from a data job run will belong to the same collection. + :param metadata: Optional[IngestionMetadata] dictionary object + containing metadata produced and possibly used by pre-ingest, + ingest, or post-ingest plugins. + NOTE: A read-only parameter. Whatever modifications are done to + this object, once returned, it is treated as a new object. :return: [Optional] IngestionMetadata, containing data about the ingestion process (information about the result from the ingestion @@ -312,7 +316,7 @@ def pre_ingest_process( target: Optional[str] = None, collection_id: Optional[str] = None, metadata: Optional[IngestionMetadata] = None, - ) -> Union[List[Dict], Tuple[List[Dict], IngestionMetadata]]: + ) -> Tuple[List[Dict], Optional[IngestionMetadata]]: """ Do some processing on the ingestion payload before passing it to the actual ingestion. @@ -322,9 +326,18 @@ def pre_ingest_process( implementing this method, and doing some payload processing: .. code-block:: python - def pre_ingest_process(self, payload: List[dict]) -> List[dict]: + def pre_ingest_process(self, + payload: List[dict], + destination_table: Optional[str], + target: Optional[str], + collection_id: Optional[str], + metadata: Optional[IngestionMetadata], + ) -> Tuple[List[Dict], Optional[IngestionMetadata]]: # Ensure all values in the payload are strings - return [{k: str(v) for (k,v) in i.items()} for i in payload] + processed_payload = \ + [{k: str(v) for (k,v) in i.items()} for i in payload] + + return processed_payload, metadata :param payload: List[dict] the ingestion payload to be processed. NOTE: A read-only parameter. Whatever modifications are done to @@ -344,7 +357,7 @@ def pre_ingest_process(self, payload: List[dict]) -> List[dict]: Example: http://example.com// This parameter does not need to be used, in case the - `INGEST_TARGET_DEFAULT` environment variable is set. This can be + `INGEST_TARGET_DEFAULT` configuration variable is set. This can be made by plugins, which may set default value, or it can be overwritten by users. NOTE: A read-only parameter. It is not expected to be modified and @@ -356,15 +369,17 @@ def pre_ingest_process(self, payload: List[dict]) -> List[dict]: job run will belong to the same collection. NOTE: A read-only parameter. It is not expected to be modified and returned. - :param metadata: Optional[IngestionMetadata] dict object containing - metadata produced by the pre-ingest plugin, and possibly used by - other pre-ingest, ingest, or post-ingest plugins. + :param metadata: Optional[IngestionMetadata], a dictionary object + containing metadata produced by the pre-ingest plugin, + and possibly used by other pre-ingest, ingest, or post-ingest + plugins. NOTE: A read-only parameter. Whatever modifications are done to this object, once returned, it is treated as a new object. - :return: Union[List[Dict], Tuple[List[Dict], IngestionMetadata]], - either only the processed payload objects, or a tuple containing - the processed payload objects and an IngestionMetadata object with - ingestion metadata information. + :return: Tuple[List[Dict], Optional[IngestionMetadata]], a tuple + containing the processed payload objects and an + IngestionMetadata object with ingestion metadata information. + If no metadata is being added or processed, the metadata object + passed to this method can be returned as is (see code block above). :exception: If an exception occurs during this operation, all other pre-processing operations and the ingestion of the payload would @@ -381,7 +396,7 @@ def post_ingest_process( destination_table: Optional[str] = None, target: Optional[str] = None, collection_id: Optional[str] = None, - ingestion_metadata: Optional[IngestionMetadata] = None, + metadata: Optional[IngestionMetadata] = None, exception: Optional[Exception] = None, ) -> Optional[IngestionMetadata]: """ @@ -398,7 +413,7 @@ def post_ingest_process( destination_table: Optional[str], target: Optional[str], collection_id: Optional[str], - ingestion_metadata: Optional[IngestionMetadata], + metadata: Optional[IngestionMetadata], exception: Optional[Exception], ) -> Optional[IngestionMetadata]: @@ -407,7 +422,7 @@ def post_ingest_process( payload_sizes = [sys.getsizeof(i) for i in payload] telemetry['payload_size'] = sum(payload_sizes) telemetry['caught_ingest_exceptions'] = exceptions - telemetry |= ingestion_metadata + telemetry |= metadata # Send telemetry to wherever is needed. result = requests.post('example.com', data=telemetry) @@ -417,9 +432,8 @@ def post_ingest_process( :param payload: Optional[List[dict]] - The payloads that have been ingested. Depending on the number - of payloads to be processed, there might 0 or many dict objects. - Each dict object is a separate payload. + The payload that has been ingested, represented as 0 or many + dictionary objects. NOTE: A read-only parameter. It is not expected to be modified and returned. :param destination_table: Optional[string] @@ -437,7 +451,7 @@ def post_ingest_process( Example: http://example.com// This parameter does not need to be used, in case the - `INGEST_TARGET_DEFAULT` environment variable is set. This can be + `INGEST_TARGET_DEFAULT` configuration variable is set. This can be made by plugins, which may set default value, or it can be overwritten by users. NOTE: A read-only parameter. It is not expected to be modified and @@ -449,7 +463,7 @@ def post_ingest_process( job run will belong to the same collection. NOTE: A read-only parameter. It is not expected to be modified and returned. - :param ingestion_metadata: Optional[IngestionMetadata] + :param metadata: Optional[IngestionMetadata] The metadata from the ingestion operation, that plugin developers have decided to process further. This metadata can be either an IngestionMetadata object (which is effectively a dictionary), or diff --git a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py index f0bc0fc4f9..57158edd0e 100644 --- a/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py +++ b/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py @@ -474,7 +474,7 @@ def _payload_poster_thread(self): while self._closed.value == 0: payload: Optional[Tuple] = None ingestion_metadata: Optional[IIngesterPlugin.IngestionMetadata] = None - exceptions: List = list() + exception: Optional[Exception] = None try: payload = self._payloads_queue.get() payload_dict, destination_table, method, target, collection_id = payload @@ -498,15 +498,15 @@ def _payload_poster_thread(self): log.exception( "A configuration error occurred while ingesting data." ) - exceptions.append(e) + exception = e except UserCodeError as e: self._plugin_errors[UserCodeError].increment() log.exception("An user error occurred while ingesting data.") - exceptions.append(e) + exception = e except Exception as e: self._plugin_errors[PlatformServiceError].increment() log.exception("A platform error occurred while ingesting data.") - exceptions.append(e) + exception = e except Exception as e: self._fail_count.increment() @@ -517,7 +517,7 @@ def _payload_poster_thread(self): "One or more rows were not ingested.\n" "Exception was: {}".format(str(e)) ) - exceptions.append(e) + exception = e finally: self._payloads_queue.task_done() @@ -525,8 +525,8 @@ def _payload_poster_thread(self): try: self._ingester.post_ingest_process( payload=payload_dict, - ingestion_metadata=ingestion_metadata, - exceptions=exceptions, + metadata=ingestion_metadata, + exception=exception, ) except Exception as e: log.info( diff --git a/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py b/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py index 37a6351310..ed960f55e2 100644 --- a/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py +++ b/projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py @@ -190,14 +190,14 @@ def test_ingest_payload_multiple_destinations(): def test_ingest_payload_and_post_ingestion_operation(): test_payload = {"key1": "val1", "key2": "val2", "key3": "val3"} test_aggregated_payload = [{"key1": "val1", "key2": "val2", "key3": "val3"}] - test_ingestion_result = {"some_metadata": "some_ingestion_metadata"} + test_ingestion_metadata = {"some_metadata": "some_ingestion_metadata"} destination_table = "a_destination_table" method = "test_method" target = "some_target" collection_id = "test_job|42a420" ingester_base = create_ingester_base() - ingester_base._ingester.ingest_payload.return_value = test_ingestion_result + ingester_base._ingester.ingest_payload.return_value = test_ingestion_metadata ingester_base.send_object_for_ingestion( payload=test_payload, @@ -216,8 +216,8 @@ def test_ingest_payload_and_post_ingestion_operation(): ) ingester_base._ingester.post_ingest_process.assert_called_with( payload=test_aggregated_payload, - ingestion_metadata=test_ingestion_result, - exceptions=[], + metadata=test_ingestion_metadata, + exception=None, ) @@ -243,6 +243,6 @@ def test_post_ingestion_operation_with_exceptions(): ingester_base._ingester.ingest_payload.assert_called_once() ingester_base._ingester.post_ingest_process.assert_called_with( payload=test_aggregated_payload, - ingestion_metadata=None, - exceptions=[test_exception], + metadata=None, + exception=test_exception, )