Refactor deferrable mode for BeamRunPythonPipelineOperator and BeamRunJavaPipelineOperator #46678
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
In this PR I have refactored deferrable mode for
Dataflow
runner forBeamRunPythonPipelineOperator
andBeamRunJavaPipelineOperator
to be able to submit the Job using Apache Beam from aworker
, and then use the Dataflow API to monitor the Job's status from thetriggerer
.In the current implementation the operator, in deferrable mode, uses Apache Beam for submitting and monitoring Job from the
triggerer
, and this implementation confuses our users. Users try to usedeferrable
mode to optimize resources, but the current implementation does not optimize any it just switches Beam's running processes fromworker
totriggerer
.By this PR I have changed this logic for Dataflow's runner, because for Dataflow's runner it's possible to change Job's state monitoring process from Apache Beam to Dataflow API. In my implementation the operator starts the Job using Apache Beam on the
worker
then usingis_dataflow_job_id_exist_callback
wait until the Job providesdataflow_job_id
. After that the operator leaves the Apache Beam waiting process and then starts a Trigger ontriggerer
for monitoring Job's status using Dataflow API.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.