From 6a661d0b5b139da30794f5f344f4a53554cec121 Mon Sep 17 00:00:00 2001 From: Adrian Olosutean Date: Tue, 2 Jun 2020 17:33:43 +0200 Subject: [PATCH] #1307 Testing with more logging and time between microbatches --- .../interpreter/fixtures/StreamingFixture.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/fixtures/StreamingFixture.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/fixtures/StreamingFixture.scala index e1cc10d8a..aaa66d3af 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/fixtures/StreamingFixture.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/fixtures/StreamingFixture.scala @@ -32,7 +32,7 @@ trait StreamingFixture extends FunSuite with SparkTestBase with MockitoSugar { implicit val menasBaseUrls: List[String] = List() implicit val cmd: ConfCmdConfig = ConfCmdConfig.apply(reportVersion = Some(1)) implicit val infoDateFactory: InfoDateFactory = new InfoDateLiteralFactory("2020-05-23") - private val interval = 10000 + private val processingTime = 1500 protected def testHyperConformance(input: DataFrame, sinkTableName: String, @@ -50,14 +50,18 @@ trait StreamingFixture extends FunSuite with SparkTestBase with MockitoSugar { val conformed: DataFrame = hyperConformance.applyConformanceTransformations(source, dataset) val sink = conformed .writeStream - .trigger(Trigger.ProcessingTime(interval)) + .trigger(Trigger.ProcessingTime(processingTime)) .queryName(sinkTableName) .outputMode("append") .format("memory") .start() - input.collect().foreach(e => memoryStream.addData(e)) - sink.awaitTermination(interval) + input.collect().foreach(e => { + memoryStream.addData(e) + Thread.sleep(100) + spark.sql(s"select * from $sinkTableName").show() + }) + sink.awaitTermination(processingTime * 3) val frame: DataFrame = spark.sql(s"select * from $sinkTableName") frame