-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#1307 Integration test for Streaming Conformance #1366
#1307 Integration test for Streaming Conformance #1366
Conversation
So far I've addded a test case with timeout which can catch #1306 . What other use cases could be useful for testing HyperConformance? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A very interesting solution.
Questions from my side are:
- Were you able to reproduce the Catalyst issue using this approach? E.g. conforming a dataset hangs the computation.
- How long it takes to run one such test?
- I'm wondering how it works in general, we can discuss this tomorrow.
spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/HyperConformance.scala
Show resolved
Hide resolved
...test/scala/za/co/absa/enceladus/conformance/streaming/HyperConformanceIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
.../src/test/scala/za/co/absa/enceladus/conformance/interpreter/fixtures/StreamingFixture.scala
Outdated
Show resolved
Hide resolved
|
.start() | ||
|
||
input.collect().foreach(e => memoryStream.addData(e)) | ||
sink.awaitTermination(interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awaitTermination(interval)
is probably a little bit dangerous. If the query is terminated just before the microbatch has been committed, then the whole microbatch will be aborted and there will be zero rows.
Also, you have to assume that your data will be processed within interval
ms. 10s is probably enough, but it's kind of a magic value.
In this case processAllAvailable
would be a good fit, because then you can be sure that all the rows will have been read, no matter to what value interval
has been set to. processAllAvailable
terminates as long as the data source is also finite which is the case here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agreee, but how can you use processAllAvailable
on a test with the catalyst optimizer workaround disabled without having an unreasonable running time for testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, now I see your need for a timeout. Either you give up on testing with multiple micro-batches and use Trigger.Once
or you could use a margin of safety, say 3 * interval
.
Also, if you really want to cover multiple micro-batches, you need to make sure that the foreach
loop runs during multiple micro-batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm slightly confused. We need to test 2 things:
- The correctness of the results of streaming conformance. All workarounds should be turned on so such tests can finish in a reasonable time.
- We need to test that workaround works by adding a test that hangs if the workaround is turned off. There is no particular time limit on the test, but should be some reasonable time.
Tests that test (1) should not rely on timing if possible. If processAllAvailable
helps with it - great..
Tests that test (2) might rely on timing, but not necessarily. Currently in batch, if the workaround stops working the corresponding test will just hang forever. Not ideal, but we expect workaround to work. If it stops having an effect, it will be noticed by the developer when the test hangs.
Alternatively, a big timeout can be used to interrupt the test if it takes too long. But the value of the timeout should be set so that test passes on all envs the test could run, including Jenkins VMs.
The reason I care about this so much is that timings introduce non-determinism. Such tests could work perfectly locally on Mac, but start failing on Windows or on Linux build box.
.../src/test/scala/za/co/absa/enceladus/conformance/interpreter/fixtures/StreamingFixture.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I love when tests are added, obviously. 👍
val frame: DataFrame = testHyperConformance(standardizedDf, | ||
"result", | ||
nestedStructsDS) | ||
frame.show() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use show in tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mostly thought that show would make following what happens at the runtime easier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand, but in an ideal world, you just look at the bottom where it says that everything passed. In case of an issue, every one of us can write this line by themselves and re-run
assertResult(frame.count())(20) | ||
} | ||
|
||
test("Test without catalyst workaround") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what this tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that since the workaround is disabled, the test will timeout and no data will be added to the sink in a reasonable time. Probably makes sense to add more description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is testing of the presence of a bug is Spark. Seems quite weird to me and I would remove the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might make sense to keep a commented test that, once uncommented, will demonstrate that the Catalyst issue still exists and the test hangs.
It might be useful in case new version of Spark has this issue fixed. We could try it then to see if the fix covers our use case.
import za.co.absa.enceladus.utils.testUtils.SparkTestBase | ||
|
||
trait StreamingFixture extends FunSuite with SparkTestBase with MockitoSugar { | ||
implicit val menasBaseUrls: List[String] = List() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it is a matter of taste, but readability wise I like List.empty[String]
the most.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering it's an implicit
that should have a type declaration, I would suggest.
implicit val menasBaseUrls: List[String] = List() | |
implicit val menasBaseUrls: List[String] = List.empty |
.../test/scala/za/co/absa/enceladus/conformance/interpreter/fixtures/NestedStructsFixture.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Just a couple of suggestions.
.../src/test/scala/za/co/absa/enceladus/conformance/interpreter/fixtures/StreamingFixture.scala
Outdated
Show resolved
Hide resolved
...test/scala/za/co/absa/enceladus/conformance/streaming/HyperConformanceIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/za/co/absa/enceladus/conformance/streaming/HyperConformanceIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/za/co/absa/enceladus/conformance/streaming/HyperConformanceIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/za/co/absa/enceladus/conformance/streaming/HyperConformanceIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/za/co/absa/enceladus/conformance/streaming/HyperConformanceIntegrationSuite.scala
Show resolved
Hide resolved
...test/scala/za/co/absa/enceladus/conformance/streaming/HyperConformanceIntegrationSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
…ing-conformance-integration-test
…ing-conformance-integration-test
Kudos, SonarCloud Quality Gate passed!
|
Release notes: here |
Closes #1307