Skip to content

Commit

Permalink
Support importing table for hive metastore catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
tobegit3hub committed Aug 11, 2021
1 parent 12d0967 commit da12197
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class OpenmldbBatchConfig extends Serializable {
var hadoopWarehousePath = ""

@ConfigOption(name="openmldb.iceberg.catalog.name", doc="The name of Iceberg catalog")
val icebergCatalogName = "iceberg_catalog"
val icebergHadoopCatalogName = "iceberg_catalog"

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.QueryPlanningTracker
Expand Down Expand Up @@ -62,6 +63,7 @@ class OpenmldbSession {
// TODO: Register table if using other constructors
val catalogTables = this.sparkSession.catalog.listTables(config.defaultHiveDatabase).collect()
for (table <- catalogTables) {
logger.info(s"Register table ${table.name} for OpenMLDB engine")
registerTable(table.name, this.sparkSession.table(table.name))
}
}
Expand Down Expand Up @@ -111,9 +113,9 @@ class OpenmldbSession {

// Set Iceberg catalog
if (!config.hadoopWarehousePath.isEmpty) {
sparkConf.set("spark.sql.catalog.%s".format(config.icebergCatalogName), "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.%s.type".format(config.icebergCatalogName), "hadoop")
sparkConf.set("spark.sql.catalog.%s.warehouse".format(config.icebergCatalogName), this.config.hadoopWarehousePath)
sparkConf.set("spark.sql.catalog.%s".format(config.icebergHadoopCatalogName), "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.%s.type".format(config.icebergHadoopCatalogName), "hadoop")
sparkConf.set("spark.sql.catalog.%s.warehouse".format(config.icebergHadoopCatalogName), this.config.hadoopWarehousePath)
}

if (config.enableHiveMetaStore) {
Expand Down Expand Up @@ -250,14 +252,35 @@ class OpenmldbSession {
* Create table and import data from DataFrame to offline storage.
*/
def importToOfflineStorage(databaseName: String, tableName: String, df: DataFrame): Unit = {
createOfflineTable(databaseName, tableName, df)
appendOfflineTable(databaseName, tableName, df)
createHiveTable(databaseName, tableName, df)
appendHiveTable(databaseName, tableName, df)
}

/**
* Create table in offline storage.
*/
def createOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = {
def createHiveTable(databaseName: String, tableName: String, df: DataFrame): Unit = {
// TODO: Check if table exists

logger.info("Register the table %s to create table in offline storage".format(tableName))
df.createOrReplaceTempView(tableName)

val conf = getSparkSession.sessionState.newHadoopConf()
val catalog = new HiveCatalog(conf)
val icebergSchema = SparkSchemaUtil.schemaForTable(getSparkSession, tableName)
val partitionSpec = PartitionSpec.builderFor(icebergSchema).build()
val tableIdentifier = TableIdentifier.of(databaseName, tableName)

catalog.createTable(tableIdentifier, icebergSchema, partitionSpec)

// Register table in OpenMLDB engine
registerTable(tableName, df)
}

/**
* Create table in offline storage.
*/
def createHadoopCatalogTable(databaseName: String, tableName: String, df: DataFrame): Unit = {
// TODO: Check if table exists

logger.info("Register the table %s to create table in offline storage".format(tableName))
Expand All @@ -276,11 +299,25 @@ class OpenmldbSession {
/**
* Append data from DataFrame to offline storage.
*/
def appendOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = {
def appendHiveTable(databaseName: String, tableName: String, df: DataFrame): Unit = {
logger.info("Register the table %s to append data in offline storage".format(tableName))
df.createOrReplaceTempView(tableName)

// TODO: Support other catalog name if possible
val icebergTableName = "%s.%s.%s".format("spark_catalog", databaseName, tableName)

// Insert parquet to Iceberg table
getSparkSession.table(tableName).writeTo(icebergTableName).append()
}

/**
* Append data from DataFrame to offline storage.
*/
def appendHadoopCatalogTable(databaseName: String, tableName: String, df: DataFrame): Unit = {
logger.info("Register the table %s to append data in offline storage".format(tableName))
df.createOrReplaceTempView(tableName)

val icebergTableName = "%s.%s.%s".format(config.icebergCatalogName, databaseName, tableName)
val icebergTableName = "%s.%s.%s".format(config.icebergHadoopCatalogName, databaseName, tableName)

// Insert parquet to Iceberg table
this.getSparkSession.table(tableName).writeTo(icebergTableName).append()
Expand Down

0 comments on commit da12197

Please sign in to comment.