From fd0fff859cf149e31f98f229cfd9fc18ae0b381e Mon Sep 17 00:00:00 2001 From: tobe Date: Tue, 20 Jul 2021 11:20:13 +0800 Subject: [PATCH 1/4] Add iceberg dependencies and set iceberg catalog from config --- java/openmldb-batch/pom.xml | 6 ++++++ .../openmldb/batch/OpenmldbBatchConfig.scala | 6 ++++++ .../openmldb/batch/api/OpenmldbSession.scala | 20 ++++++++++++++++--- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/java/openmldb-batch/pom.xml b/java/openmldb-batch/pom.xml index 5128c95e782..b20a52bf3df 100644 --- a/java/openmldb-batch/pom.xml +++ b/java/openmldb-batch/pom.xml @@ -332,6 +332,12 @@ 2.7.4 + + + org.apache.iceberg + iceberg-spark3-runtime + 0.11.1 + diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala index 1c83e215610..ff1f0827a6d 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala @@ -123,6 +123,12 @@ class OpenmldbBatchConfig extends Serializable { //@ConfigOption(name="openmldb.hybridse.jsdk.path", doc="The path of HybridSE jsdk core file path") var hybridseJsdkLibraryPath = "" + @ConfigOption(name="openmldb.hadoop.warehouse.path", doc="The path of Hadoop warehouse") + var hadoopWarehousePath = "hdfs://172.27.128.215/user/tobe/iceberg_demo5/" + + @ConfigOption(name="openmldb.iceberg.catalog.name", doc="The name of Iceberg catalog") + val icebergCatalogName = "iceberg_catalog" + } diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala index 84881765337..b7819412487 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala @@ -52,7 +52,7 @@ class OpenmldbSession { this() this.sparkSession = sparkSession this.config = OpenmldbBatchConfig.fromSparkSession(sparkSession) - this.sparkSession.conf.set("spark.sql.session.timeZone", config.timeZone) + this.setDefaultSparkConfig() } /** @@ -74,18 +74,32 @@ class OpenmldbSession { val builder = SparkSession.builder() // TODO: Need to set for official Spark 2.3.0 jars - logger.debug("Set spark.hadoop.yarn.timeline-service.enabled as false") - builder.config("spark.hadoop.yarn.timeline-service.enabled", value = false) + //logger.debug("Set spark.hadoop.yarn.timeline-service.enabled as false") + //builder.config("spark.hadoop.yarn.timeline-service.enabled", value = false) + builder.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") this.sparkSession = builder.appName("App") .master(sparkMaster) .getOrCreate() + + this.setDefaultSparkConfig() } this.sparkSession } } + def setDefaultSparkConfig() = { + val sparkConf = this.sparkSession.conf + // Set timezone + sparkConf.set("spark.sql.session.timeZone", config.timeZone) + + // Set Iceberg catalog + sparkConf.set("spark.sql.catalog.%s".format(config.icebergCatalogName), "org.apache.iceberg.spark.SparkCatalog") + sparkConf.set("spark.sql.catalog.%s.type".format(config.icebergCatalogName), "hadoop") + sparkConf.set("spark.sql.catalog.%s.warehouse".format(config.icebergCatalogName), this.config.hadoopWarehousePath) + } + /** * Read the file with get dataframe with Spark API. * From c6d07ef0be538b1919f3cb27b27a24a72c716d0a Mon Sep 17 00:00:00 2001 From: tobe Date: Tue, 20 Jul 2021 11:20:32 +0800 Subject: [PATCH 2/4] Fix end2end window test case --- .../com/_4paradigm/openmldb/batch/end2end/TestWindow.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/end2end/TestWindow.scala b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/end2end/TestWindow.scala index e2bc5eac7a1..21a18f3dfc3 100644 --- a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/end2end/TestWindow.scala +++ b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/end2end/TestWindow.scala @@ -17,6 +17,7 @@ package com._4paradigm.openmldb.batch.end2end import com._4paradigm.openmldb.batch.api.OpenmldbSession +import com._4paradigm.openmldb.batch.utils.SparkUtil import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SparkSession} import org.scalatest.FunSuite @@ -54,11 +55,9 @@ class TestWindow extends FunSuite { val sqlText = "SELECT sum(trans_amount) OVER w AS w_sum_amount FROM t1 WINDOW w AS (PARTITION BY user ORDER BY trans_time ROWS BETWEEN 10 PRECEDING AND CURRENT ROW)" val outputDf = sess.sql(sqlText) - sess.version() - - //val sparksqlOutputDf = sess.sparksql(sqlText) + val sparksqlOutputDf = sess.sparksql(sqlText) // Notice that the sum column type is different for SparkSQL and SparkFE - //assert(SparkUtil.approximateDfEqual(outputDf.getSparkDf(), sparksqlOutputDf, false)) + assert(SparkUtil.approximateDfEqual(outputDf.getSparkDf(), sparksqlOutputDf, false)) } } From 0d9e24810a6a682133010e67ad4249f38b6fc5df Mon Sep 17 00:00:00 2001 From: tobe Date: Tue, 20 Jul 2021 11:22:19 +0800 Subject: [PATCH 3/4] Reset the default value of openmldb batch config --- .../com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala index ff1f0827a6d..3fa652f51bf 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala @@ -124,7 +124,7 @@ class OpenmldbBatchConfig extends Serializable { var hybridseJsdkLibraryPath = "" @ConfigOption(name="openmldb.hadoop.warehouse.path", doc="The path of Hadoop warehouse") - var hadoopWarehousePath = "hdfs://172.27.128.215/user/tobe/iceberg_demo5/" + var hadoopWarehousePath = "" @ConfigOption(name="openmldb.iceberg.catalog.name", doc="The name of Iceberg catalog") val icebergCatalogName = "iceberg_catalog" From 603a19c7eda9fa626915f9044ad1a0b4857b8aae Mon Sep 17 00:00:00 2001 From: tobe Date: Tue, 20 Jul 2021 15:41:29 +0800 Subject: [PATCH 4/4] Add new APIs to import table in offline storage --- .../openmldb/batch/api/OpenmldbSession.scala | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala index b7819412487..069dd443512 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala @@ -18,6 +18,11 @@ package com._4paradigm.openmldb.batch.api import com._4paradigm.openmldb.batch.{OpenmldbBatchConfig, SparkPlanner} import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.iceberg.PartitionSpec +import org.apache.iceberg.catalog.TableIdentifier +import org.apache.iceberg.hadoop.HadoopCatalog +import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -89,7 +94,7 @@ class OpenmldbSession { } } - def setDefaultSparkConfig() = { + def setDefaultSparkConfig(): Unit = { val sparkConf = this.sparkSession.conf // Set timezone sparkConf.set("spark.sql.session.timeZone", config.timeZone) @@ -222,4 +227,44 @@ class OpenmldbSession { sparkSession.stop() } + /** + * Create table and import data from DataFrame to offline storage. + */ + def importToOfflineStorage(databaseName: String, tableName: String, df: DataFrame): Unit = { + createOfflineTable(databaseName, tableName, df) + appendOfflineTable(databaseName, tableName, df) + } + + /** + * Create table in offline storage. + */ + def createOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = { + // TODO: Check if table exists + + logger.info("Register the table %s to create table in offline storage".format(tableName)) + df.createOrReplaceTempView(tableName) + + val hadoopConfiguration = new Configuration() + val hadoopCatalog = new HadoopCatalog(hadoopConfiguration, config.hadoopWarehousePath) + val icebergSchema = SparkSchemaUtil.schemaForTable(this.getSparkSession, tableName) + val partitionSpec = PartitionSpec.builderFor(icebergSchema).build() + val tableIdentifier = TableIdentifier.of(databaseName, tableName) + + // Create Iceberg table + hadoopCatalog.createTable(tableIdentifier, icebergSchema, partitionSpec) + } + + /** + * Append data from DataFrame to offline storage. + */ + def appendOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = { + logger.info("Register the table %s to append data in offline storage".format(tableName)) + df.createOrReplaceTempView(tableName) + + val icebergTableName = "%s.%s.%s".format(config.icebergCatalogName, databaseName, tableName) + + // Insert parquet to Iceberg table + this.getSparkSession.table(tableName).writeTo(icebergTableName).append() + } + }