From 12d096755d57e161b413c99237e8c29c775fc52c Mon Sep 17 00:00:00 2001 From: tobe Date: Wed, 11 Aug 2021 16:53:15 +0800 Subject: [PATCH 1/2] Add iceberg and hive config to support metastore --- java/openmldb-batch/pom.xml | 14 +++++++++ .../openmldb/batch/OpenmldbBatchConfig.scala | 6 ++++ .../openmldb/batch/api/OpenmldbSession.scala | 31 +++++++++++++++---- 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/java/openmldb-batch/pom.xml b/java/openmldb-batch/pom.xml index 063c9f02cfa..86d2c6102e3 100644 --- a/java/openmldb-batch/pom.xml +++ b/java/openmldb-batch/pom.xml @@ -259,6 +259,8 @@ junit 4.13.1 + + org.apache.spark spark-core_${scala.binary.version} @@ -271,6 +273,12 @@ ${spark.version} ${spark.dependencyScope} + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + + org.yaml snakeyaml @@ -339,6 +347,12 @@ 0.11.1 + + org.apache.iceberg + iceberg-hive + 0.9.1 + + diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala index 3fa652f51bf..b81fd513551 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala @@ -123,6 +123,12 @@ class OpenmldbBatchConfig extends Serializable { //@ConfigOption(name="openmldb.hybridse.jsdk.path", doc="The path of HybridSE jsdk core file path") var hybridseJsdkLibraryPath = "" + @ConfigOption("openmldb.enable.hive.metastore", "Need to set hive.metastore.uris") + var enableHiveMetaStore = false + + @ConfigOption("openmldb.hive.default.database", "The default database from hive metastore") + var defaultHiveDatabase = "default" + @ConfigOption(name="openmldb.hadoop.warehouse.path", doc="The path of Hadoop warehouse") var hadoopWarehousePath = "" diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala index d8d78b6d211..bfd157d30c6 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala @@ -58,6 +58,12 @@ class OpenmldbSession { this.sparkSession = sparkSession this.config = OpenmldbBatchConfig.fromSparkSession(sparkSession) this.setDefaultSparkConfig() + + // TODO: Register table if using other constructors + val catalogTables = this.sparkSession.catalog.listTables(config.defaultHiveDatabase).collect() + for (table <- catalogTables) { + registerTable(table.name, this.sparkSession.table(table.name)) + } } /** @@ -81,11 +87,15 @@ class OpenmldbSession { // TODO: Need to set for official Spark 2.3.0 jars //logger.debug("Set spark.hadoop.yarn.timeline-service.enabled as false") //builder.config("spark.hadoop.yarn.timeline-service.enabled", value = false) + builder.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + builder.appName("App").master(sparkMaster) + + if (config.enableHiveMetaStore) { + builder.enableHiveSupport() + } - this.sparkSession = builder.appName("App") - .master(sparkMaster) - .getOrCreate() + this.sparkSession = builder.getOrCreate() this.setDefaultSparkConfig() } @@ -100,9 +110,18 @@ class OpenmldbSession { sparkConf.set("spark.sql.session.timeZone", config.timeZone) // Set Iceberg catalog - sparkConf.set("spark.sql.catalog.%s".format(config.icebergCatalogName), "org.apache.iceberg.spark.SparkCatalog") - sparkConf.set("spark.sql.catalog.%s.type".format(config.icebergCatalogName), "hadoop") - sparkConf.set("spark.sql.catalog.%s.warehouse".format(config.icebergCatalogName), this.config.hadoopWarehousePath) + if (!config.hadoopWarehousePath.isEmpty) { + sparkConf.set("spark.sql.catalog.%s".format(config.icebergCatalogName), "org.apache.iceberg.spark.SparkCatalog") + sparkConf.set("spark.sql.catalog.%s.type".format(config.icebergCatalogName), "hadoop") + sparkConf.set("spark.sql.catalog.%s.warehouse".format(config.icebergCatalogName), this.config.hadoopWarehousePath) + } + + if (config.enableHiveMetaStore) { + sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + sparkConf.set("spark.sql.catalog.spark_catalog.type", "hive") + // TODO: Check if "hive.metastore.uris" is set or not + } + } /** From da12197808961c3fc1652bbfea8c8be2d0408d70 Mon Sep 17 00:00:00 2001 From: tobe Date: Wed, 11 Aug 2021 17:09:41 +0800 Subject: [PATCH 2/2] Support importing table for hive metastore catalog --- .../openmldb/batch/OpenmldbBatchConfig.scala | 2 +- .../openmldb/batch/api/OpenmldbSession.scala | 53 ++++++++++++++++--- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala index b81fd513551..aaf35a58ef8 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala @@ -133,7 +133,7 @@ class OpenmldbBatchConfig extends Serializable { var hadoopWarehousePath = "" @ConfigOption(name="openmldb.iceberg.catalog.name", doc="The name of Iceberg catalog") - val icebergCatalogName = "iceberg_catalog" + val icebergHadoopCatalogName = "iceberg_catalog" } diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala index bfd157d30c6..52f289b557b 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.iceberg.PartitionSpec import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.hadoop.HadoopCatalog +import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.QueryPlanningTracker @@ -62,6 +63,7 @@ class OpenmldbSession { // TODO: Register table if using other constructors val catalogTables = this.sparkSession.catalog.listTables(config.defaultHiveDatabase).collect() for (table <- catalogTables) { + logger.info(s"Register table ${table.name} for OpenMLDB engine") registerTable(table.name, this.sparkSession.table(table.name)) } } @@ -111,9 +113,9 @@ class OpenmldbSession { // Set Iceberg catalog if (!config.hadoopWarehousePath.isEmpty) { - sparkConf.set("spark.sql.catalog.%s".format(config.icebergCatalogName), "org.apache.iceberg.spark.SparkCatalog") - sparkConf.set("spark.sql.catalog.%s.type".format(config.icebergCatalogName), "hadoop") - sparkConf.set("spark.sql.catalog.%s.warehouse".format(config.icebergCatalogName), this.config.hadoopWarehousePath) + sparkConf.set("spark.sql.catalog.%s".format(config.icebergHadoopCatalogName), "org.apache.iceberg.spark.SparkCatalog") + sparkConf.set("spark.sql.catalog.%s.type".format(config.icebergHadoopCatalogName), "hadoop") + sparkConf.set("spark.sql.catalog.%s.warehouse".format(config.icebergHadoopCatalogName), this.config.hadoopWarehousePath) } if (config.enableHiveMetaStore) { @@ -250,14 +252,35 @@ class OpenmldbSession { * Create table and import data from DataFrame to offline storage. */ def importToOfflineStorage(databaseName: String, tableName: String, df: DataFrame): Unit = { - createOfflineTable(databaseName, tableName, df) - appendOfflineTable(databaseName, tableName, df) + createHiveTable(databaseName, tableName, df) + appendHiveTable(databaseName, tableName, df) } /** * Create table in offline storage. */ - def createOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = { + def createHiveTable(databaseName: String, tableName: String, df: DataFrame): Unit = { + // TODO: Check if table exists + + logger.info("Register the table %s to create table in offline storage".format(tableName)) + df.createOrReplaceTempView(tableName) + + val conf = getSparkSession.sessionState.newHadoopConf() + val catalog = new HiveCatalog(conf) + val icebergSchema = SparkSchemaUtil.schemaForTable(getSparkSession, tableName) + val partitionSpec = PartitionSpec.builderFor(icebergSchema).build() + val tableIdentifier = TableIdentifier.of(databaseName, tableName) + + catalog.createTable(tableIdentifier, icebergSchema, partitionSpec) + + // Register table in OpenMLDB engine + registerTable(tableName, df) + } + + /** + * Create table in offline storage. + */ + def createHadoopCatalogTable(databaseName: String, tableName: String, df: DataFrame): Unit = { // TODO: Check if table exists logger.info("Register the table %s to create table in offline storage".format(tableName)) @@ -276,11 +299,25 @@ class OpenmldbSession { /** * Append data from DataFrame to offline storage. */ - def appendOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = { + def appendHiveTable(databaseName: String, tableName: String, df: DataFrame): Unit = { + logger.info("Register the table %s to append data in offline storage".format(tableName)) + df.createOrReplaceTempView(tableName) + + // TODO: Support other catalog name if possible + val icebergTableName = "%s.%s.%s".format("spark_catalog", databaseName, tableName) + + // Insert parquet to Iceberg table + getSparkSession.table(tableName).writeTo(icebergTableName).append() + } + + /** + * Append data from DataFrame to offline storage. + */ + def appendHadoopCatalogTable(databaseName: String, tableName: String, df: DataFrame): Unit = { logger.info("Register the table %s to append data in offline storage".format(tableName)) df.createOrReplaceTempView(tableName) - val icebergTableName = "%s.%s.%s".format(config.icebergCatalogName, databaseName, tableName) + val icebergTableName = "%s.%s.%s".format(config.icebergHadoopCatalogName, databaseName, tableName) // Insert parquet to Iceberg table this.getSparkSession.table(tableName).writeTo(icebergTableName).append()