Skip to content

Commit

Permalink
fix concurrency exception in tests (OpenLineage#3316)
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski authored Dec 12, 2024
1 parent cb5dc80 commit 595aca8
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ void testKafkaSourceToBatchSink() throws TimeoutException, StreamingQueryExcepti

SparkSession spark =
createSparkSession(server.getAddress().getPort(), "testKafkaSourceToBatchSink");
spark.sparkContext().setLogLevel("ERROR");

Dataset<Row> sourceStream =
readKafkaTopic(spark, kafkaContainer.sourceTopic, bootstrapServers)
Expand Down Expand Up @@ -319,7 +318,6 @@ void testKafkaSourceToJdbcBatchSink()

SparkSession spark =
createSparkSession(server.getAddress().getPort(), "testKafkaSourceToJdbcBatchSink");
spark.sparkContext().setLogLevel("ERROR");

Dataset<Row> sourceStream =
readKafkaTopic(spark, kafkaContainer.sourceTopic, bootstrapServers)
Expand Down Expand Up @@ -390,8 +388,6 @@ void testKafkaClusterResolveNamespace()
SparkSession spark =
createSparkSession(httpServer.getAddress().getPort(), "testKafkaClusterResolveNamespace");

spark.sparkContext().setLogLevel("WARN");

spark
.readStream()
.format("kafka")
Expand Down Expand Up @@ -434,8 +430,6 @@ void readFromCsvFilesInAStreamingMode()
SparkSession spark =
createSparkSession(server.getAddress().getPort(), "testReadFromCsvFilesInAStreamingMode");

spark.sparkContext().setLogLevel("INFO");

spark
.readStream()
.format("csv")
Expand Down
6 changes: 5 additions & 1 deletion integration/spark/app/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ loggers=openlineage, openlineage-shaded, spark-sql, shutdown-hook-manager, spark
openlineage-argparser, \
spark-sql-catalyst, hive, spark-jetty, spark-storage, spark-scheduler, \
spark-executor, spark-security-manager, spark-context, spark-resource, \
spark-util, spark-env, parquet, hadoop, iceberg, spark-mapred, spark-network, iceberg-hadoop
spark-util, spark-env, parquet, hadoop, iceberg, spark-mapred, spark-network, iceberg-hadoop, \
kafka

logger.openlineage.name = io.openlineage
logger.openlineage.level = info
Expand Down Expand Up @@ -85,6 +86,9 @@ logger.spark-sql-catalyst.level = error
logger.iceberg-hadoop.name = org.apache.iceberg.hadoop
logger.iceberg-hadoop.level = error

logger.kafka.name = org.apache.kafka
logger.kafka.level = warn

# Needs to be at info level for testcontainers to know that the container is "ready"
logger.shutdown-hook-manager.name = org.apache.spark.util.ShutdownHookManager
logger.shutdown-hook-manager.level = info
Expand Down

0 comments on commit 595aca8

Please sign in to comment.