Skip to content

Commit

Permalink
#1307 Testing with more logging and time between microbatches
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian-Olosutean committed Jun 2, 2020
1 parent 31163c5 commit 6a661d0
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait StreamingFixture extends FunSuite with SparkTestBase with MockitoSugar {
implicit val menasBaseUrls: List[String] = List()
implicit val cmd: ConfCmdConfig = ConfCmdConfig.apply(reportVersion = Some(1))
implicit val infoDateFactory: InfoDateFactory = new InfoDateLiteralFactory("2020-05-23")
private val interval = 10000
private val processingTime = 1500

protected def testHyperConformance(input: DataFrame,
sinkTableName: String,
Expand All @@ -50,14 +50,18 @@ trait StreamingFixture extends FunSuite with SparkTestBase with MockitoSugar {
val conformed: DataFrame = hyperConformance.applyConformanceTransformations(source, dataset)
val sink = conformed
.writeStream
.trigger(Trigger.ProcessingTime(interval))
.trigger(Trigger.ProcessingTime(processingTime))
.queryName(sinkTableName)
.outputMode("append")
.format("memory")
.start()

input.collect().foreach(e => memoryStream.addData(e))
sink.awaitTermination(interval)
input.collect().foreach(e => {
memoryStream.addData(e)
Thread.sleep(100)
spark.sql(s"select * from $sinkTableName").show()
})
sink.awaitTermination(processingTime * 3)

val frame: DataFrame = spark.sql(s"select * from $sinkTableName")
frame
Expand Down

0 comments on commit 6a661d0

Please sign in to comment.