-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
vdk-core: add flag to enable synchronous/blockng ingestion #698
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seem to be unused imports and failing tests.
projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py
Outdated
Show resolved
Hide resolved
projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_configuration.py
Show resolved
Hide resolved
""" | ||
Wait for completion of processing of all data added to the payload | ||
queue and then log the ingestion statistics. | ||
queue. | ||
""" | ||
ingester_utils.wait_completion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would presume wait_completion
in ingester_base.py would wait for ingestion completion (by context). In fact, it is wait_send_completion
.
def wait_completion(objects_queue: queue.Queue, payloads_queue: queue.Queue):
objects_queue.join()
payloads_queue.join()
versatile-data-kit/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_utils.py
Line 119 in fab98b7
def wait_completion(objects_queue: queue.Queue, payloads_queue: queue.Queue): |
Looking at the contract of the queue-polling abstract handler ingest_payload
that we wait-for-completion, pydoc says:
Do the actual ingestion of the payload
It is missing a clear explanation if the ingestion is actually synchronously performed, and at the end of the operation the data is ingested and available. Would speculate that "actual ingestion" reminds more of a sync in favour of async contract.
And we have the clashing contract that is in fact reviewed for deletion in the scope of this MR:
def send_object_for_ingestion(
self,
payload: dict,
destination_table: Optional[str],
method: Optional[str],
target: Optional[str] = None,
collection_id: Optional[str] = None,
):
"""
Send a self-contained object, asynchronously, for ingestion.
versatile-data-kit/projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py
Line 110 in fab98b7
Send a self-contained object, asynchronously, for ingestion. |
So this MR seems like an attempt to change or improve the contract of ingestion interface surface. Contract was ambiguous before the change, and now it's gravitating towards synchronous ingestion.
In case an ingestion implementation is in fact asynchronous, the responsibility of "wait on ingestion completion" gets shifted to the user entirely, and versatile-data-kit does not support asynchronous ingestion.
If we agree that conclusion is correct, then we may elaborate if we need a contract for both scenarios?
Both sync and async contract support for abstractions is not that platform unconventional, an example of a platform interface that provides contract for async implementation would be java.util.concurrent.Future:
A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or BlockingQueue interface add/offer/put, latest one contracts an implementation that waits for the operation to complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our current contract is entirely asynchrounous - you call send_object_for_ingestion - and any kind of processing or sending (done by plugins) is done in background (asynchronous) .
We now add a flag which says that the sending/processing of data is still done in background (async) but all invocations to the ingest method blocks until all payloads are processed (in background)
In the future I think (we should/may) add send_tabular_data_for_ingestion to return a Future/Promise (like java.util.concurrent.Future) and let user decide on per invocation
Do you have any suggestion of how to phrase the pydoc so the contract is clearer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the doc of send_** methods to be more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our current contract is entirely asynchrounous - you call send_object_for_ingestion - and any kind of processing or sending (done by plugins) is done in background (asynchronous) .
I would rather say that any kind of sending is done asynchronous, since provided out-of-the-box by versatile-data-kit asynchronously. And the processing itself, may be asynchronous or synchronous, depending on any IIngesterPlugin ingest_payload
implementation or data receiver component implementation.
Then, having methods named "wait_completion" in ingestion/ingester_utils.py
that is used in base_ingester.py
, is somewhat to say "deceiving" or simply unclear/confusing - that we wait for "ingestion completion" - yet in fact it waits for "ingestion sending" to complete.
If implementation-specific ingestion processing completes on receiving data sent response (sync), or some time after receiving the data sent response (async), could not be now predetermined - unless we specify a contract for sync/async ingestion implementation.
If we want to not yet introduce such contract, we may want to clarify what activity "wait_completion" waits on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is true that we wait until the plugin acknowledges that data is sent. The plugin implementation itself could be async .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But do you have any proposal for improvement ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method "wait_completion" in ingestion/ingester_utils.py
that is used in base_ingester.py
. It seems out of context, presuming ingestion is completed at that point - and it may be, it may be not (if ingestion implementation is sync/async).
Maybe improve the naming of "wait_completion" to emphasize on data sending/consumption/ instead, so it is correct in all possible use-cases (sync/async).
2af7e43
to
c1b5e30
Compare
We need to enable data jobs which want to combine ingestion and processing steps in a single job. Currently because ingestion is synchrounous if we add ingestion step and processing step afterwards , the processing step would not use the latest data. The life-expectancy job https://github.com/vmware/versatile-data-kit/tree/main/examples/life-expectancy - is an example job. It first ingests some data into Trino and then cleans in subsequent steps. What happen oftne is that the processing steps work with empty tables because the data has not been inserted/ingested yet. We are adding new flag: INGESTER_WAIT_TO_FINISH_AFTER_EVERY_SEND which when send will cause all calls to send_tabular_data_for_ingestion and send_object_for_ingestion to block until the payloads is actually processsed and send (to the plugin) Testing Done: added unit tests which cover those scenarios. Signed-off-by: Antoni Ivanov <[email protected]>
c1b5e30
to
9b5b0f2
Compare
requested changes addressed
@@ -101,3 +105,16 @@ def add_definitions(config_builder: ConfigurationBuilder): | |||
" ingester will raise an exception at the end - during finalize_job phase (plugin hook)." | |||
"By default this will cause the data job to fail (this behaviour can be overridden by plugins).", | |||
) | |||
config_builder.add( | |||
key=INGESTER_WAIT_TO_FINISH_AFTER_EVERY_SEND, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe call it INGESTER_SHOULD_WAIT_TO_FINISH_AFTER_EVERY_SEND
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I think clarity is more important than length. Length is still important. "should" does not really make the variable name more or less clear.
We need to enable data jobs which want to combine ingestion and
processing steps in a single job. Currently because ingestion is
synchrounous if we add ingestion step and processing step afterwards ,
the processing step would not use the latest data.
The life-expectancy job
https://github.com/vmware/versatile-data-kit/tree/main/examples/life-expectancy
It first ingests some data into Trino and then cleans in subsequent
steps. What happen oftne is that the processing steps work with empty
tables because the data has not been inserted/ingested yet.
We are adding new flag: INGESTER_WAIT_TO_FINISH_AFTER_EVERY_SEND which
when send will cause all calls to send_tabular_data_for_ingestion and
send_object_for_ingestion to block until the payloads is actually
processsed and send (to the plugin)
Testing Done: added unit tests which cover those scenarios.
Signed-off-by: Antoni Ivanov [email protected]