Skip to content

Commit

Permalink
vdk-core: Address feedback on interface changes
Browse files Browse the repository at this point in the history
Signed-off-by: Andon Andonov <[email protected]>
  • Loading branch information
doks5 committed Jan 31, 2022
1 parent 5fef6ca commit 6f12670
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 39 deletions.
7 changes: 5 additions & 2 deletions projects/vdk-core/cicd/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ cd ..

export PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL:-https://test.pypi.org/simple/}

echo "Update pip to newest version"
pip install -U pip
# Below lines are commented out to unblock CI/CD until latest pip no longer
# causes installation issues.
# TODO: uncomment once latest pip starts working as expected.
#echo "Update pip to newest version"
#pip install -U pip

echo "install dependencies from requirements.txt (used for development and testing)"
pip install --extra-index-url $PIP_EXTRA_INDEX_URL -r requirements.txt
Expand Down
62 changes: 38 additions & 24 deletions projects/vdk-core/src/vdk/api/plugin/plugin_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,22 +231,21 @@ class IIngesterPlugin:
"""

IngestionMetadata = NewType("IngestionMetadata", Dict)
ExceptionsList = NewType("ExceptionsList", List)

def ingest_payload(
self,
payload: List[dict],
destination_table: Optional[str],
target: Optional[str] = None,
collection_id: Optional[str] = None,
metadata: Optional[IngestionMetadata] = None,
) -> Optional[IngestionMetadata]:
"""
Do the actual ingestion of the payload
:param payload: List[dict]
The payloads to be ingested. Depending on the number of payloads
to be processed, there might be 0 or many dict objects. Each dict
object is a separate payload.
The payload to be ingested, split into 1 or many dictionary
objects.
Note: The memory size of the list is dependent on the
payload_size_bytes_threshold attribute.
:param destination_table: Optional[string]
Expand All @@ -262,14 +261,19 @@ def ingest_payload(
Example:
http://example.com/<some-api>/<data-source_and-db-table>
This parameter does not need to be used, in case the
`INGEST_TARGET_DEFAULT` environment variable is set. This can be
`INGEST_TARGET_DEFAULT` configuration variable is set. This can be
made by plugins, which may set default value, or it can be
overwritten by users.
:param collection_id: string
(Optional) An identifier to indicate that data from different
method invocations belong to same collection. Defaults to
"data_job_name|OpID", meaning all method invocations from a data
job run will belong to the same collection.
:param metadata: Optional[IngestionMetadata] dictionary object
containing metadata produced and possibly used by pre-ingest,
ingest, or post-ingest plugins.
NOTE: A read-only parameter. Whatever modifications are done to
this object, once returned, it is treated as a new object.
:return: [Optional] IngestionMetadata, containing data about the
ingestion process (information about the result from the ingestion
Expand Down Expand Up @@ -312,7 +316,7 @@ def pre_ingest_process(
target: Optional[str] = None,
collection_id: Optional[str] = None,
metadata: Optional[IngestionMetadata] = None,
) -> Union[List[Dict], Tuple[List[Dict], IngestionMetadata]]:
) -> Tuple[List[Dict], Optional[IngestionMetadata]]:
"""
Do some processing on the ingestion payload before passing it to the
actual ingestion.
Expand All @@ -322,9 +326,18 @@ def pre_ingest_process(
implementing this method, and doing some payload processing:
.. code-block:: python
def pre_ingest_process(self, payload: List[dict]) -> List[dict]:
def pre_ingest_process(self,
payload: List[dict],
destination_table: Optional[str],
target: Optional[str],
collection_id: Optional[str],
metadata: Optional[IngestionMetadata],
) -> Tuple[List[Dict], Optional[IngestionMetadata]]:
# Ensure all values in the payload are strings
return [{k: str(v) for (k,v) in i.items()} for i in payload]
processed_payload = \
[{k: str(v) for (k,v) in i.items()} for i in payload]
return processed_payload, metadata
:param payload: List[dict] the ingestion payload to be processed.
NOTE: A read-only parameter. Whatever modifications are done to
Expand All @@ -344,7 +357,7 @@ def pre_ingest_process(self, payload: List[dict]) -> List[dict]:
Example:
http://example.com/<some-api>/<data-source_and-db-table>
This parameter does not need to be used, in case the
`INGEST_TARGET_DEFAULT` environment variable is set. This can be
`INGEST_TARGET_DEFAULT` configuration variable is set. This can be
made by plugins, which may set default value, or it can be
overwritten by users.
NOTE: A read-only parameter. It is not expected to be modified and
Expand All @@ -356,15 +369,17 @@ def pre_ingest_process(self, payload: List[dict]) -> List[dict]:
job run will belong to the same collection.
NOTE: A read-only parameter. It is not expected to be modified and
returned.
:param metadata: Optional[IngestionMetadata] dict object containing
metadata produced by the pre-ingest plugin, and possibly used by
other pre-ingest, ingest, or post-ingest plugins.
:param metadata: Optional[IngestionMetadata], a dictionary object
containing metadata produced by the pre-ingest plugin,
and possibly used by other pre-ingest, ingest, or post-ingest
plugins.
NOTE: A read-only parameter. Whatever modifications are done to
this object, once returned, it is treated as a new object.
:return: Union[List[Dict], Tuple[List[Dict], IngestionMetadata]],
either only the processed payload objects, or a tuple containing
the processed payload objects and an IngestionMetadata object with
ingestion metadata information.
:return: Tuple[List[Dict], Optional[IngestionMetadata]], a tuple
containing the processed payload objects and an
IngestionMetadata object with ingestion metadata information.
If no metadata is being added or processed, the metadata object
passed to this method can be returned as is (see code block above).
:exception: If an exception occurs during this operation, all other
pre-processing operations and the ingestion of the payload would
Expand All @@ -381,7 +396,7 @@ def post_ingest_process(
destination_table: Optional[str] = None,
target: Optional[str] = None,
collection_id: Optional[str] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
metadata: Optional[IngestionMetadata] = None,
exception: Optional[Exception] = None,
) -> Optional[IngestionMetadata]:
"""
Expand All @@ -398,7 +413,7 @@ def post_ingest_process(
destination_table: Optional[str],
target: Optional[str],
collection_id: Optional[str],
ingestion_metadata: Optional[IngestionMetadata],
metadata: Optional[IngestionMetadata],
exception: Optional[Exception],
) -> Optional[IngestionMetadata]:
Expand All @@ -407,7 +422,7 @@ def post_ingest_process(
payload_sizes = [sys.getsizeof(i) for i in payload]
telemetry['payload_size'] = sum(payload_sizes)
telemetry['caught_ingest_exceptions'] = exceptions
telemetry |= ingestion_metadata
telemetry |= metadata
# Send telemetry to wherever is needed.
result = requests.post('example.com', data=telemetry)
Expand All @@ -417,9 +432,8 @@ def post_ingest_process(
:param payload: Optional[List[dict]]
The payloads that have been ingested. Depending on the number
of payloads to be processed, there might 0 or many dict objects.
Each dict object is a separate payload.
The payload that has been ingested, represented as 0 or many
dictionary objects.
NOTE: A read-only parameter. It is not expected to be modified and
returned.
:param destination_table: Optional[string]
Expand All @@ -437,7 +451,7 @@ def post_ingest_process(
Example:
http://example.com/<some-api>/<data-source_and-db-table>
This parameter does not need to be used, in case the
`INGEST_TARGET_DEFAULT` environment variable is set. This can be
`INGEST_TARGET_DEFAULT` configuration variable is set. This can be
made by plugins, which may set default value, or it can be
overwritten by users.
NOTE: A read-only parameter. It is not expected to be modified and
Expand All @@ -449,7 +463,7 @@ def post_ingest_process(
job run will belong to the same collection.
NOTE: A read-only parameter. It is not expected to be modified and
returned.
:param ingestion_metadata: Optional[IngestionMetadata]
:param metadata: Optional[IngestionMetadata]
The metadata from the ingestion operation, that plugin developers
have decided to process further. This metadata can be either an
IngestionMetadata object (which is effectively a dictionary), or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def _payload_poster_thread(self):
while self._closed.value == 0:
payload: Optional[Tuple] = None
ingestion_metadata: Optional[IIngesterPlugin.IngestionMetadata] = None
exceptions: List = list()
exception: Optional[Exception] = None
try:
payload = self._payloads_queue.get()
payload_dict, destination_table, method, target, collection_id = payload
Expand All @@ -498,15 +498,15 @@ def _payload_poster_thread(self):
log.exception(
"A configuration error occurred while ingesting data."
)
exceptions.append(e)
exception = e
except UserCodeError as e:
self._plugin_errors[UserCodeError].increment()
log.exception("An user error occurred while ingesting data.")
exceptions.append(e)
exception = e
except Exception as e:
self._plugin_errors[PlatformServiceError].increment()
log.exception("A platform error occurred while ingesting data.")
exceptions.append(e)
exception = e

except Exception as e:
self._fail_count.increment()
Expand All @@ -517,16 +517,16 @@ def _payload_poster_thread(self):
"One or more rows were not ingested.\n"
"Exception was: {}".format(str(e))
)
exceptions.append(e)
exception = e
finally:
self._payloads_queue.task_done()

# Complete Post-Ingestion operations
try:
self._ingester.post_ingest_process(
payload=payload_dict,
ingestion_metadata=ingestion_metadata,
exceptions=exceptions,
metadata=ingestion_metadata,
exception=exception,
)
except Exception as e:
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ def test_ingest_payload_multiple_destinations():
def test_ingest_payload_and_post_ingestion_operation():
test_payload = {"key1": "val1", "key2": "val2", "key3": "val3"}
test_aggregated_payload = [{"key1": "val1", "key2": "val2", "key3": "val3"}]
test_ingestion_result = {"some_metadata": "some_ingestion_metadata"}
test_ingestion_metadata = {"some_metadata": "some_ingestion_metadata"}
destination_table = "a_destination_table"
method = "test_method"
target = "some_target"
collection_id = "test_job|42a420"
ingester_base = create_ingester_base()

ingester_base._ingester.ingest_payload.return_value = test_ingestion_result
ingester_base._ingester.ingest_payload.return_value = test_ingestion_metadata

ingester_base.send_object_for_ingestion(
payload=test_payload,
Expand All @@ -216,8 +216,8 @@ def test_ingest_payload_and_post_ingestion_operation():
)
ingester_base._ingester.post_ingest_process.assert_called_with(
payload=test_aggregated_payload,
ingestion_metadata=test_ingestion_result,
exceptions=[],
metadata=test_ingestion_metadata,
exception=None,
)


Expand All @@ -243,6 +243,6 @@ def test_post_ingestion_operation_with_exceptions():
ingester_base._ingester.ingest_payload.assert_called_once()
ingester_base._ingester.post_ingest_process.assert_called_with(
payload=test_aggregated_payload,
ingestion_metadata=None,
exceptions=[test_exception],
metadata=None,
exception=test_exception,
)

0 comments on commit 6f12670

Please sign in to comment.