Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vdk-core] Implementation of new ingestion interfaces #690

Merged
merged 3 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def __init__(
op_id: str,
ingester: IIngesterPlugin,
ingest_config: IngesterConfiguration,
pre_processors: Optional[List[IIngesterPlugin]] = None,
post_processors: Optional[List[IIngesterPlugin]] = None,
):
"""
This constructor must be called by inheritors.
Expand All @@ -55,10 +57,19 @@ def __init__(
OpId of the data job run.
:param ingest_config: IngesterConfiguration
Configuration related to the core ingestion API.
:param pre_processors: Optional[List[IIngesterPlugin]]
A list of initialized IIngesterPlugin instances, whose purpose
is to process the payload before it is ingested.
:param post_processors: Optional[List[IIngesterPlugin]]
A list of initialized IIngesterPlugin instances, whose purpose
is to process the ingestion metadata after the ingestion of the
payload.
"""
self._data_job_name = data_job_name
self._op_id = op_id
self._ingester = ingester
self._pre_processors = pre_processors
self._post_processors = post_processors
self._number_of_worker_threads = ingest_config.get_number_of_worker_threads()
self._payload_size_bytes_threshold = (
ingest_config.get_payload_size_bytes_threshold()
Expand Down Expand Up @@ -472,67 +483,85 @@ def _payload_poster_thread(self):
data, etc.) and ingesting the data.
"""
while self._closed.value == 0:
payload: Optional[Tuple] = None
ingestion_metadata: Optional[IIngesterPlugin.IngestionMetadata] = None
exception: Optional[Exception] = None
try:
payload = self._payloads_queue.get()
payload_dict, destination_table, method, target, collection_id = payload

ingestion_metadata: Optional[IIngesterPlugin.IngestionMetadata] = None
exception: Optional[Exception] = None
payload_obj: Optional[List] = None
destination_table: Optional[str] = None
target: Optional[str] = None
collection_id: Optional[str] = None
try:
ingestion_metadata = self._ingester.ingest_payload(
payload=payload_dict,
destination_table=destination_table,
target=target,
collection_id=collection_id,
)
payload = self._payloads_queue.get()
payload_obj, destination_table, _, target, collection_id = payload

# If there are any pre-processors set, pass the payload object
# through them.
if self._pre_processors:
payload_obj, ingestion_metadata = self._pre_process_payload(
payload=payload_obj,
destination_table=destination_table,
target=target,
collection_id=collection_id,
metadata=ingestion_metadata,
)

try:
ingestion_metadata = self._ingester.ingest_payload(
payload=payload_obj,
destination_table=destination_table,
target=target,
collection_id=collection_id,
metadata=ingestion_metadata,
)

self._success_count.increment()
except VdkConfigurationError:
# TODO: logging for every error might be too much
# There could be million of uploads and millions of error logs would be hard to use.
# But until we have a way to aggregate the errors and show
# the most relevant errors it's better to make sure we do not hide an issue
# and be verbose.
log.exception(
"A configuration error occurred while ingesting data."
)
raise
except UserCodeError:
log.exception("An user error occurred while ingesting data.")
raise
except Exception:
log.exception("A platform error occurred while ingesting data.")
raise

self._success_count.increment()
except VdkConfigurationError as e:
self._plugin_errors[VdkConfigurationError].increment()
# TODO: logging for every error might be too much
# There could be million of uploads and millions of error logs would be hard to use.
# But until we have a way to aggregate the errors and show
# the most relevant errors it's better to make sure we do not hide an issue
# and be verbose.
log.exception(
"A configuration error occurred while ingesting data."
)
exception = e
except UserCodeError as e:
self._plugin_errors[UserCodeError].increment()
log.exception("An user error occurred while ingesting data.")
exception = e
except Exception as e:
self._plugin_errors[PlatformServiceError].increment()
log.exception("A platform error occurred while ingesting data.")
self._fail_count.increment()
if self._log_upload_errors:
# TODO: When working on row count telemetry we can add the exact number of rows not ingested.
log.warning(
"Failed to ingest a payload. "
"One or more rows were not ingested.\n"
"Exception was: {}".format(str(e))
)
exception = e

except Exception as e:
self._fail_count.increment()
if self._log_upload_errors:
# TODO: When working on row count telemetry we can add the exact number of rows not ingested.
log.warning(
"Failed to ingest a payload. "
"One or more rows were not ingested.\n"
"Exception was: {}".format(str(e))
finally:
self._payloads_queue.task_done()

# If there are any post-processors set, complete the post-process
# operations
if self._post_processors:
self._execute_post_process_operations(
payload=payload_obj,
destination_table=destination_table,
target=target,
collection_id=collection_id,
metadata=ingestion_metadata,
exception=exception,
)
exception = e
finally:
self._payloads_queue.task_done()

# Complete Post-Ingestion operations
try:
self._ingester.post_ingest_process(
payload=payload_dict,
metadata=ingestion_metadata,
exception=exception,
)
except Exception as e:
log.info(
"Could not complete the post-ingestion operation. "
f"The error encountered was: {e}",
)
except VdkConfigurationError:
self._plugin_errors[VdkConfigurationError].increment()
except UserCodeError:
self._plugin_errors[UserCodeError].increment()
except Exception:
self._plugin_errors[PlatformServiceError].increment()

def _start_workers(self):
"""
Expand Down Expand Up @@ -575,6 +604,72 @@ def close_now(self):
f"ingesting plugin errors:{self._plugin_errors}\n\t\t"
)

def _pre_process_payload(
self,
payload: List[dict],
destination_table: Optional[str],
target: Optional[str],
collection_id: Optional[str],
metadata: Optional[IIngesterPlugin.IngestionMetadata],
) -> Tuple[List[dict], Optional[IIngesterPlugin.IngestionMetadata]]:
for plugin in self._pre_processors:
try:
payload, metadata = plugin.pre_ingest_process(
payload=payload,
destination_table=destination_table,
target=target,
collection_id=collection_id,
metadata=metadata,
)
except Exception as e:
errors.log_and_throw(
to_be_fixed_by=ResolvableBy.USER_ERROR,
log=log,
what_happened="Failed to pre-process the data.",
why_it_happened=f"User Error occurred. Exception was: {e}",
consequences="Execution of the data job will fail, "
"in order to prevent data corruption.",
countermeasures="Check if the data sent for ingestion "
"is aligned with the requirements, "
"and that the pre-process plugins are "
"configured correctly.",
)
return payload, metadata

def _execute_post_process_operations(
self,
payload: List[dict],
destination_table: Optional[str],
target: Optional[str],
collection_id: Optional[str],
metadata: Optional[IIngesterPlugin.IngestionMetadata],
exception: Optional[Exception],
):
for plugin in self._post_processors:
try:
metadata = plugin.post_ingest_process(
payload=payload,
destination_table=destination_table,
target=target,
collection_id=collection_id,
metadata=metadata,
exception=exception,
)
except Exception as e:
errors.log_and_throw(
to_be_fixed_by=ResolvableBy.USER_ERROR,
log=log,
what_happened="Could not complete post-ingestion process.",
why_it_happened=f"User Error occurred. Exception was: {e}",
consequences="Execution of the data job will fail, "
"in order to prevent data corruption.",
countermeasures="Check if the data sent for "
"post-processing "
"is aligned with the requirements, "
"and that the post-process plugins are "
"configured correctly.",
)

def _handle_results(self):
if self._plugin_errors.get(UserCodeError, AtomicCounter(0)).value > 0:
self._log_and_throw(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional

from vdk.api.plugin.plugin_input import IIngesterPlugin
Expand All @@ -21,6 +22,7 @@
from vdk.internal.core.errors import VdkConfigurationError
from vdk.internal.core.statestore import CommonStoreKeys
from vdk.internal.core.statestore import StateStore
from vdk.internal.util.utils import parse_config_sequence


IngesterPluginFactory = Callable[[], IIngesterPlugin]
Expand Down Expand Up @@ -111,9 +113,10 @@ def send_tabular_data_for_ingestion(
target: Optional[str] = None,
collection_id: Optional[str] = None,
):
# Use the method and target provided by customer, or load the default ones
# if set. `method` and `target` provided when the method is called take
# precedence over the ones set with environment variables.
# Use the method and target provided by customer, or load the
# default ones, if set. `method` and `target` provided when the
# method is called take precedence over the ones set with
# environment variables.
method = method or self._cfg.get_value("INGEST_METHOD_DEFAULT")
target = target or self._cfg.get_value("INGEST_TARGET_DEFAULT")
self._log.info(
Expand Down Expand Up @@ -221,15 +224,57 @@ def __initialize_ingester(self, method) -> IngesterBase:
f"If upgraded recently consider reverting to previous version. Or use another method type.",
)
else:
# Initialize the pre- and post- processors.
initialized_pre_processors = self.__get_initialized_processors(
"INGEST_PAYLOAD_PREPROCESS_SEQUENCE"
)
initialized_post_processors = self.__get_initialized_processors(
"INGEST_PAYLOAD_POSTPROCESS_SEQUENCE"
)

self._cached_ingesters[method] = IngesterBase(
self._state.get(ExecutionStateStoreKeys.JOB_NAME),
self._state.get(CommonStoreKeys.OP_ID),
ingester_plugin,
IngesterConfiguration(config=self._cfg),
pre_processors=initialized_pre_processors,
post_processors=initialized_post_processors,
)

return self._cached_ingesters[method]

def __get_initialized_processors(self, config_var: str) -> List:
return [
self.__initialize_processor(i)
for i in parse_config_sequence(self._cfg, key=config_var, sep=",")
]

def __initialize_processor(self, method) -> Optional[IIngesterPlugin]:
processor_plugin = None

try:
processor_plugin = self._ingester_builders[method]()
except KeyError:
self._log.error("Could not initialize processor plugin.")

if processor_plugin is None:
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.CONFIG_ERROR,
log=self._log,
what_happened="Could not create new processor plugin of type"
f" {method}.",
why_it_happened=f"VDK was run with method={method}, however "
"no valid ingestion processor plugin was "
"created.",
consequences=errors.MSG_CONSEQUENCE_DELEGATING_TO_CALLER__LIKELY_EXECUTION_FAILURE,
countermeasures="Seems to be a bug in the plugin for method"
f" {method}. Make sure it's correctly "
f"installed. If upgraded recently, consider"
" reverting to previous version. Or use "
"another method type.",
)
return processor_plugin

def close(self):
"""
Wait for all ingestion payloads to be sent and clean up the queues. This
Expand Down
27 changes: 27 additions & 0 deletions projects/vdk-core/src/vdk/internal/util/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# SPDX-License-Identifier: Apache-2.0
import inspect
from typing import Any
from typing import List
from typing import Optional

from vdk.internal.core.config import Configuration


def class_fqname(py_object: Any) -> str:
Expand All @@ -13,3 +17,26 @@ def class_fqname(py_object: Any) -> str:
if module is None:
return py_object.__class__.__name__
return f"{py_object.__class__.__module__}.{py_object.__class__.__name__}"


def parse_config_sequence(
cfg: Configuration, key: str, sep: Optional[str] = None
) -> List:
"""
Parse a configuration variable, whose value is a string sequence, by a
specified delimiter.
:param cfg: Configuration
The configuration of Versatile Data Kit.
:param key: string
The name of the configuration variable that is to be parsed.
:param sep: string
Optional. The delimiter by which the configuration variable string is
to be split. If not specified or set as None, any whitespace will be
considered a delimiter, and the result from the split will have no
empty strings.
:return:
"""
sequence = cfg.get_value(key)
if sequence:
sequence = [i.strip() for i in sequence.split(sep)]
return sequence if sequence else []
Loading