Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task]: Enable to set timeout for Python TestPipeline #29646

Open
1 of 16 tasks
Abacn opened this issue Dec 6, 2023 · 7 comments
Open
1 of 16 tasks

[Task]: Enable to set timeout for Python TestPipeline #29646

Abacn opened this issue Dec 6, 2023 · 7 comments

Comments

@Abacn
Copy link
Contributor

Abacn commented Dec 6, 2023

What needs to happen?

Currently TestPipeline run indefinitely:

state = result.wait_until_finish()

In the case the test timeout, it does not print useful information, just a pytest timeout message and the stacktrace where it gets interrupted (e.g. https://github.com/apache/beam/runs/19275621816)

Failed: Timeout >4500.0s
self = <apache_beam.transforms.ptransform_test.PTransformTest testMethod=test_flatten_one_single_pcollection>

    @pytest.mark.it_validatesrunner
    def test_flatten_one_single_pcollection(self):
>     with TestPipeline() as pipeline:
...
        while thread.is_alive():
>         time.sleep(5.0)
E         Failed: Timeout >4500.0s

However, DataflowRunner.wait_until_finish() indeed supports duration:

def wait_until_finish(self, duration=None):

and when timeout, it prints the job id so one can find the Dataflow job to investigate:

assert duration or terminated, (
'Job did not reach to a terminal state after waiting indefinitely. '
'{}'.format(consoleUrl))

We should be able to use this functionality for TestPipeline, for example,

with TestPipeline(timeout = 600.0) as p:
   ...

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@GautamGottipati
Copy link

.take-issue

@GautamGottipati
Copy link

@Abacn As far as I understand we have to add Timeout for TestPipeline. So I plan to pass this parameter while creation of TestPipeline class by initializing a varible self.duration = timeout in test_pipeline.__init__(duration=None) and pass this initialized variable to
state = result.wait_until_finish(duration=duration) . But I am unable to understand how is this wait_until_finish() function is called, it would be helpful if I get some explaination regarding this.
Secondly, Is my understanding correct.

@Tjindl
Copy link

Tjindl commented Oct 20, 2024

.take-issue

@Tjindl
Copy link

Tjindl commented Oct 21, 2024

Hello @Abacn, I looked around the Python code to get hints about the Java SDK component. I think someone has ticked the Python SDK component, but I suspect it is not solved. Could you please clarify whether the Python SDK component is actually solved or just a mistake?

@derrickaw
Copy link
Contributor

.take-issue

@Naseer-010
Copy link

.take-issue

@Naseer-010
Copy link

hey @Abacn I think we just have to add timeout parameter as someone mentioned before and pass it to wait until finish. But the timeout time should be in seconds or milliseconds?
I have checked the code and in wait_until_finish() the duration is in milliseconds. Do you want me to use milliseconds as it is or it will be good to use seconds as user can also add fractional seconds so that he doesn't have to enter large numbers when they want to run for some minutes ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants