Skip to content

Commit

Permalink
Refactor deferrable mode for BeamRunPythonPipelineOperator and BeamRu…
Browse files Browse the repository at this point in the history
…nJavaPipelineOperator (#46678)
  • Loading branch information
MaksYermak authored Feb 20, 2025
1 parent b95990b commit ee68ddf
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def process_fd(
fd,
log: logging.Logger,
process_line_callback: Callable[[str], None] | None = None,
check_job_status_callback: Callable[[], bool | None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
):
"""
Print output to logs.
Expand All @@ -117,7 +117,7 @@ def process_fd(
if process_line_callback:
process_line_callback(line)
func_log(line.rstrip("\n"))
if check_job_status_callback and check_job_status_callback():
if is_dataflow_job_id_exist_callback and is_dataflow_job_id_exist_callback():
return


Expand All @@ -126,7 +126,7 @@ def run_beam_command(
log: logging.Logger,
process_line_callback: Callable[[str], None] | None = None,
working_directory: str | None = None,
check_job_status_callback: Callable[[], bool | None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None:
"""
Run pipeline command in subprocess.
Expand Down Expand Up @@ -158,16 +158,16 @@ def run_beam_command(
continue

for readable_fd in readable_fds:
process_fd(proc, readable_fd, log, process_line_callback, check_job_status_callback)
if check_job_status_callback and check_job_status_callback():
process_fd(proc, readable_fd, log, process_line_callback, is_dataflow_job_id_exist_callback)
if is_dataflow_job_id_exist_callback and is_dataflow_job_id_exist_callback():
return

if proc.poll() is not None:
break

# Corner case: check if more output was created between the last read and the process termination
for readable_fd in reads:
process_fd(proc, readable_fd, log, process_line_callback, check_job_status_callback)
process_fd(proc, readable_fd, log, process_line_callback, is_dataflow_job_id_exist_callback)

log.info("Process exited with return code: %s", proc.returncode)

Expand Down Expand Up @@ -198,7 +198,7 @@ def _start_pipeline(
command_prefix: list[str],
process_line_callback: Callable[[str], None] | None = None,
working_directory: str | None = None,
check_job_status_callback: Callable[[], bool | None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None:
cmd = [*command_prefix, f"--runner={self.runner}"]
if variables:
Expand All @@ -208,7 +208,7 @@ def _start_pipeline(
process_line_callback=process_line_callback,
working_directory=working_directory,
log=self.log,
check_job_status_callback=check_job_status_callback,
is_dataflow_job_id_exist_callback=is_dataflow_job_id_exist_callback,
)

def start_python_pipeline(
Expand All @@ -220,7 +220,7 @@ def start_python_pipeline(
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
process_line_callback: Callable[[str], None] | None = None,
check_job_status_callback: Callable[[], bool | None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
):
"""
Start Apache Beam python pipeline.
Expand Down Expand Up @@ -289,7 +289,7 @@ def start_python_pipeline(
variables=variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
check_job_status_callback=check_job_status_callback,
is_dataflow_job_id_exist_callback=is_dataflow_job_id_exist_callback,
)

def start_java_pipeline(
Expand All @@ -298,6 +298,7 @@ def start_java_pipeline(
jar: str,
job_class: str | None = None,
process_line_callback: Callable[[str], None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None:
"""
Start Apache Beam Java pipeline.
Expand All @@ -316,6 +317,7 @@ def start_java_pipeline(
variables=variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
is_dataflow_job_id_exist_callback=is_dataflow_job_id_exist_callback,
)

def start_go_pipeline(
Expand Down
Loading

0 comments on commit ee68ddf

Please sign in to comment.