diff --git a/hm-spark/applications/ingest-from-s3-to-kafka/src/main/scala/com/hongbomiao/IngestFromS3ToKafka.scala b/hm-spark/applications/ingest-from-s3-to-kafka/src/main/scala/com/hongbomiao/IngestFromS3ToKafka.scala index a8c6cf4f3..5917d1449 100755 --- a/hm-spark/applications/ingest-from-s3-to-kafka/src/main/scala/com/hongbomiao/IngestFromS3ToKafka.scala +++ b/hm-spark/applications/ingest-from-s3-to-kafka/src/main/scala/com/hongbomiao/IngestFromS3ToKafka.scala @@ -2,7 +2,7 @@ package com.hongbomiao import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, struct} -import org.apache.spark.sql.types.{DoubleType, LongType, StructType} +import org.apache.spark.sql.types.LongType import za.co.absa.abris.avro.functions.to_avro import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig} @@ -12,18 +12,14 @@ object IngestFromS3ToKafka { .builder() .master("local[*]") .appName("ingest-from-s3-to-kafka") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog" + ) .config("spark.ui.port", "4040") .getOrCreate() - val folderPath = "s3a://hongbomiao-bucket/iot/" - - // val parquet_schema = spark.read.parquet("s3a://hongbomiao-bucket/iot/motor.parquet").schema - val parquetSchema = new StructType() - .add("timestamp", DoubleType) - .add("current", DoubleType, nullable = true) - .add("voltage", DoubleType, nullable = true) - .add("temperature", DoubleType, nullable = true) - val toAvroConfig: ToAvroConfig = AbrisConfig.toConfluentAvro.downloadSchemaByLatestVersion .andTopicNameStrategy("hm.motor") @@ -32,9 +28,8 @@ object IngestFromS3ToKafka { ) val df = spark.readStream - .schema(parquetSchema) - .option("maxFilesPerTrigger", 10) - .parquet(folderPath) + .format("delta") + .load("s3a://hongbomiao-bucket/delta-tables/motor") .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType)) .select(to_avro(struct("*"), toAvroConfig).as("value"))