Skip to content

Commit

Permalink
[590] Add Hudi HMS Catalog Sync Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Vamsi committed Feb 13, 2025
1 parent 2e71e15 commit 619672e
Show file tree
Hide file tree
Showing 15 changed files with 1,146 additions and 20 deletions.
8 changes: 8 additions & 0 deletions xtable-hive-metastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@
<scope>provided</scope>
</dependency>

<!-- Hudi dependencies -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hive-sync</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>

<!-- Iceberg dependencies -->
<dependency>
<groupId>org.apache.iceberg</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import lombok.RequiredArgsConstructor;
import lombok.ToString;

import org.apache.hudi.hive.MultiPartKeysValueExtractor;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand All @@ -43,6 +45,15 @@ public class HMSCatalogConfig {
@JsonProperty("externalCatalog.hms.serverUrl")
private final String serverUrl;

@JsonProperty("externalCatalog.hms.schema_string_length_thresh")
private final int schemaLengthThreshold = 4000;

@JsonProperty("externalCatalog.hms.partition_extractor_class")
private final String partitionExtractorClass = MultiPartKeysValueExtractor.class.getName();

@JsonProperty("externalCatalog.hms.max_partitions_per_request")
private final int maxPartitionsPerRequest = 1000;

protected static HMSCatalogConfig of(Map<String, String> properties) {
try {
return OBJECT_MAPPER.readValue(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.hms;

import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.extern.log4j.Log4j2;

import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;

import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.TableNotFoundException;

import org.apache.xtable.catalog.CatalogPartition;
import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;

@Log4j2
public class HMSCatalogPartitionSyncOperations implements CatalogPartitionSyncOperations {

private final IMetaStoreClient metaStoreClient;
private final HMSCatalogConfig catalogConfig;

public HMSCatalogPartitionSyncOperations(
IMetaStoreClient metaStoreClient, HMSCatalogConfig hmsCatalogConfig) {
this.metaStoreClient = metaStoreClient;
this.catalogConfig = hmsCatalogConfig;
}

@Override
public List<CatalogPartition> getAllPartitions(CatalogTableIdentifier catalogTableIdentifier) {
HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(catalogTableIdentifier);
try {
return metaStoreClient
.listPartitions(
tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), (short) -1)
.stream()
.map(p -> new CatalogPartition(p.getValues(), p.getSd().getLocation()))
.collect(Collectors.toList());
} catch (TException e) {
throw new CatalogSyncException(
"Failed to get all partitions for table " + tableIdentifier, e);
}
}

@Override
public void addPartitionsToTable(
CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> partitionsToAdd) {
HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(catalogTableIdentifier);
if (partitionsToAdd.isEmpty()) {
log.info("No partitions to add for " + tableIdentifier);
return;
}
log.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableIdentifier);
try {
StorageDescriptor sd =
metaStoreClient
.getTable(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName())
.getSd();
for (List<CatalogPartition> batch :
CollectionUtils.batches(partitionsToAdd, catalogConfig.getMaxPartitionsPerRequest())) {
List<org.apache.hadoop.hive.metastore.api.Partition> partitionList = new ArrayList<>();
batch.forEach(
partition -> {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
partitionSd.setOutputFormat(sd.getOutputFormat());
partitionSd.setSerdeInfo(sd.getSerdeInfo());

partitionSd.setLocation(partition.getStorageLocation());
partitionList.add(
new org.apache.hadoop.hive.metastore.api.Partition(
partition.getValues(),
tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName(),
0,
0,
partitionSd,
null));
});
metaStoreClient.add_partitions(partitionList, true, false);
log.info("Add batch partitions done: " + partitionList.size());
}
} catch (TException e) {
log.error("{} add partition failed", tableIdentifier, e);
throw new CatalogSyncException(tableIdentifier + " add partition failed", e);
}
}

@Override
public void updatePartitionsToTable(
CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> changedPartitions) {
HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(catalogTableIdentifier);
try {
Table table =
metaStoreClient.getTable(
tableIdentifier.getDatabaseName(), tableIdentifier.getTableName());
StorageDescriptor tableSd = table.getSd();

List<org.apache.hadoop.hive.metastore.api.Partition> updatedPartitions = new ArrayList<>();

changedPartitions.forEach(
p -> {
StorageDescriptor partitionSd = new StorageDescriptor(tableSd);
partitionSd.setLocation(p.getStorageLocation());

org.apache.hadoop.hive.metastore.api.Partition partition =
new org.apache.hadoop.hive.metastore.api.Partition();
partition.setDbName(tableIdentifier.getDatabaseName());
partition.setTableName(tableIdentifier.getTableName());
partition.setValues(p.getValues());
partition.setSd(partitionSd);
updatedPartitions.add(partition);
});

// Update partitions (drop existing and add new ones with updated locations)
for (org.apache.hadoop.hive.metastore.api.Partition partition : updatedPartitions) {
metaStoreClient.dropPartition(
tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName(),
partition.getValues(),
false);
metaStoreClient.add_partition(partition);
}
} catch (TException e) {
throw new CatalogSyncException(
"Failed to update partitions for the table " + tableIdentifier, e);
}
}

@Override
public void dropPartitions(
CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> partitionsToDrop) {
HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(catalogTableIdentifier);
try {
for (CatalogPartition partition : partitionsToDrop) {
metaStoreClient.dropPartition(
tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName(),
partition.getValues(),
false);
}
} catch (TException e) {
throw new CatalogSyncException("Failed to drop partitions for table " + tableIdentifier, e);
}
}

@Override
public Map<String, String> getTableProperties(
CatalogTableIdentifier catalogTableIdentifier, List<String> keysToRetrieve) {
HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(catalogTableIdentifier);
try {
Table table =
metaStoreClient.getTable(
tableIdentifier.getDatabaseName(), tableIdentifier.getTableName());
Map<String, String> tableParameters = table.getParameters();

return keysToRetrieve.stream()
.filter(tableParameters::containsKey)
.collect(Collectors.toMap(key -> key, tableParameters::get));
} catch (TableNotFoundException | TException e) {
throw new CatalogSyncException(
"failed to fetch last time synced properties for table" + tableIdentifier, e);
}
}

@Override
public void updateTableProperties(
CatalogTableIdentifier catalogTableIdentifier, Map<String, String> propertiesToUpdate) {
HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(catalogTableIdentifier);
try {
if (propertiesToUpdate == null || propertiesToUpdate.isEmpty()) {
return;
}

Table table =
metaStoreClient.getTable(
tableIdentifier.getDatabaseName(), tableIdentifier.getTableName());
Map<String, String> tableParameters = table.getParameters();
tableParameters.putAll(propertiesToUpdate);
table.setParameters(tableParameters);
metaStoreClient.alter_table(
tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), table);
} catch (TableNotFoundException | TException e) {
throw new CatalogSyncException(
"failed to update last time synced properties for table" + tableIdentifier, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Optional;

import lombok.extern.log4j.Log4j2;

Expand All @@ -36,7 +37,10 @@

import com.google.common.annotations.VisibleForTesting;

import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
import org.apache.xtable.catalog.CatalogPartitionSyncTool;
import org.apache.xtable.catalog.CatalogTableBuilder;
import org.apache.xtable.catalog.CatalogUtils;
import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.InternalTable;
Expand All @@ -55,6 +59,7 @@ public class HMSCatalogSyncClient implements CatalogSyncClient<Table> {
private Configuration configuration;
private IMetaStoreClient metaStoreClient;
private CatalogTableBuilder<Table, Table> tableBuilder;
private Optional<CatalogPartitionSyncTool> partitionSyncTool;

// For loading the instance using ServiceLoader
public HMSCatalogSyncClient() {}
Expand All @@ -70,12 +75,25 @@ public HMSCatalogSyncClient(
HMSCatalogConfig hmsCatalogConfig,
Configuration configuration,
IMetaStoreClient metaStoreClient,
CatalogTableBuilder tableBuilder) {
CatalogTableBuilder tableBuilder,
Optional<CatalogPartitionSyncTool> partitionSyncTool) {
this.catalogConfig = catalogConfig;
this.hmsCatalogConfig = hmsCatalogConfig;
this.configuration = configuration;
this.metaStoreClient = metaStoreClient;
this.tableBuilder = tableBuilder;
this.partitionSyncTool = partitionSyncTool;
}

private Optional<CatalogPartitionSyncTool> getPartitionSyncTool(String tableFormat) {
String partitionValueExtractorClass = hmsCatalogConfig.getPartitionExtractorClass();
CatalogPartitionSyncOperations hmsCatalogPartitionSyncOperations =
new HMSCatalogPartitionSyncOperations(metaStoreClient, hmsCatalogConfig);
return CatalogUtils.getPartitionSyncTool(
tableFormat,
partitionValueExtractorClass,
hmsCatalogPartitionSyncOperations,
configuration);
}

@Override
Expand Down Expand Up @@ -145,6 +163,8 @@ public void createTable(InternalTable table, CatalogTableIdentifier tableIdentif
} catch (TException e) {
throw new CatalogSyncException("Failed to create table: " + tableIdentifier.getId(), e);
}

partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, tableIdentifier));
}

@Override
Expand All @@ -158,6 +178,8 @@ public void refreshTable(
} catch (TException e) {
throw new CatalogSyncException("Failed to refresh table: " + tableIdentifier.getId(), e);
}

partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, tableIdentifier));
}

@Override
Expand Down Expand Up @@ -194,7 +216,10 @@ private void _init(
} catch (MetaException | HiveException e) {
throw new CatalogSyncException("HiveMetastoreClient could not be created", e);
}
this.tableBuilder = HMSCatalogTableBuilderFactory.getInstance(tableFormat, this.configuration);
this.tableBuilder =
HMSCatalogTableBuilderFactory.getTableBuilder(
tableFormat, hmsCatalogConfig, this.configuration);
this.partitionSyncTool = getPartitionSyncTool(tableFormat);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,23 @@
import org.apache.xtable.catalog.CatalogTableBuilder;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.hms.table.DeltaHMSCatalogTableBuilder;
import org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder;
import org.apache.xtable.hms.table.IcebergHMSCatalogTableBuilder;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
import org.apache.xtable.model.storage.TableFormat;

public class HMSCatalogTableBuilderFactory {

static CatalogTableBuilder<Table, Table> getInstance(
String tableFormat, Configuration configuration) {
public static CatalogTableBuilder<Table, Table> getTableBuilder(
String tableFormat, HMSCatalogConfig hmsCatalogConfig, Configuration configuration) {
switch (tableFormat) {
case TableFormat.ICEBERG:
return new IcebergHMSCatalogTableBuilder(configuration);
case TableFormat.DELTA:
return new DeltaHMSCatalogTableBuilder();
case TableFormat.HUDI:
return new HudiHMSCatalogTableBuilder(hmsCatalogConfig, configuration);
default:
throw new NotSupportedException("Unsupported table format: " + tableFormat);
}
Expand Down
Loading

0 comments on commit 619672e

Please sign in to comment.