Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Vamsi committed Feb 17, 2025
1 parent a15a7b7 commit c3543fa
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,21 @@
import org.apache.xtable.schema.SparkSchemaExtractor;

/** Util class to fetch details about Hudi table */
public class HudiCatalogTableUtils {
public class HudiCatalogTablePropertiesExtractor {

private static final HudiCatalogTablePropertiesExtractor INSTANCE =
new HudiCatalogTablePropertiesExtractor();

public static HudiCatalogTablePropertiesExtractor getInstance() {
return INSTANCE;
}
/**
* Get Spark Sql related table properties. This is used for spark datasource table.
*
* @param schema The schema to write to the table.
* @return A new parameters added the spark's table properties.
*/
public static Map<String, String> getSparkTableProperties(
public Map<String, String> getSparkTableProperties(
List<String> partitionNames,
String sparkVersion,
int schemaLengthThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;

public class TestHudiCatalogTableUtils {
public class TestHudiCatalogTablePropertiesExtractor {

@Test
void testGetSparkTableProperties() {
Expand Down Expand Up @@ -81,8 +81,8 @@ void testGetSparkTableProperties() {
.build();

Map<String, String> result =
HudiCatalogTableUtils.getSparkTableProperties(
partitionNames, sparkVersion, schemaLengthThreshold, schema);
HudiCatalogTablePropertiesExtractor.getInstance()
.getSparkTableProperties(partitionNames, sparkVersion, schemaLengthThreshold, schema);

// Validate results
assertEquals("hudi", result.get("spark.sql.sources.provider"));
Expand Down Expand Up @@ -127,8 +127,8 @@ void testGetSparkTablePropertiesEmptyPartitions() {

// Call the method
Map<String, String> result =
HudiCatalogTableUtils.getSparkTableProperties(
partitionNames, "", schemaLengthThreshold, schema);
HudiCatalogTablePropertiesExtractor.getInstance()
.getSparkTableProperties(partitionNames, "", schemaLengthThreshold, schema);

assertEquals("hudi", result.get("spark.sql.sources.provider"));
assertNull(result.get("spark.sql.create.version"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import lombok.extern.log4j.Log4j2;

import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -76,42 +77,34 @@ public void addPartitionsToTable(
HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(catalogTableIdentifier);
if (partitionsToAdd.isEmpty()) {
log.info("No partitions to add for " + tableIdentifier);
log.info("No partitions to add for {}", tableIdentifier);
return;
}
log.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableIdentifier);
log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableIdentifier);
try {
StorageDescriptor sd =
metaStoreClient
.getTable(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName())
.getSd();
for (List<CatalogPartition> batch :
CollectionUtils.batches(partitionsToAdd, catalogConfig.getMaxPartitionsPerRequest())) {
List<org.apache.hadoop.hive.metastore.api.Partition> partitionList = new ArrayList<>();
batch.forEach(
partition -> {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
partitionSd.setOutputFormat(sd.getOutputFormat());
partitionSd.setSerdeInfo(sd.getSerdeInfo());

partitionSd.setLocation(partition.getStorageLocation());
partitionList.add(
new org.apache.hadoop.hive.metastore.api.Partition(
partition.getValues(),
tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName(),
0,
0,
partitionSd,
null));
});
metaStoreClient.add_partitions(partitionList, true, false);
log.info("Add batch partitions done: " + partitionList.size());
}

CollectionUtils.batches(partitionsToAdd, catalogConfig.getMaxPartitionsPerRequest())
.forEach(
batch -> {
List<Partition> partitionList = new ArrayList<>();
batch.forEach(
partition -> {
partitionList.add(createPartition(tableIdentifier, partition, sd));
});
try {
metaStoreClient.add_partitions(partitionList, true, false);
} catch (TException e) {
log.error("{} add partition failed", tableIdentifier, e);
throw new CatalogSyncException(tableIdentifier + " add partition failed", e);
}
log.info("Add batch partitions done: {}", partitionList.size());
});
} catch (TException e) {
log.error("{} add partition failed", tableIdentifier, e);
log.error("Failed to add partitions to table {}", tableIdentifier, e);
throw new CatalogSyncException(tableIdentifier + " add partition failed", e);
}
}
Expand All @@ -127,31 +120,16 @@ public void updatePartitionsToTable(
tableIdentifier.getDatabaseName(), tableIdentifier.getTableName());
StorageDescriptor tableSd = table.getSd();

List<org.apache.hadoop.hive.metastore.api.Partition> updatedPartitions = new ArrayList<>();
List<Partition> updatedPartitions = new ArrayList<>();

changedPartitions.forEach(
p -> {
StorageDescriptor partitionSd = new StorageDescriptor(tableSd);
partitionSd.setLocation(p.getStorageLocation());

org.apache.hadoop.hive.metastore.api.Partition partition =
new org.apache.hadoop.hive.metastore.api.Partition();
partition.setDbName(tableIdentifier.getDatabaseName());
partition.setTableName(tableIdentifier.getTableName());
partition.setValues(p.getValues());
partition.setSd(partitionSd);
updatedPartitions.add(partition);
partition -> {
updatedPartitions.add(createPartition(tableIdentifier, partition, tableSd));
});

// Update partitions (drop existing and add new ones with updated locations)
for (org.apache.hadoop.hive.metastore.api.Partition partition : updatedPartitions) {
metaStoreClient.dropPartition(
tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName(),
partition.getValues(),
false);
metaStoreClient.add_partition(partition);
}
// Update partitions
metaStoreClient.alter_partitions(
tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), updatedPartitions);
} catch (TException e) {
throw new CatalogSyncException(
"Failed to update partitions for the table " + tableIdentifier, e);
Expand Down Expand Up @@ -219,4 +197,25 @@ public void updateTableProperties(
"failed to update last time synced properties for table" + tableIdentifier, e);
}
}

private Partition createPartition(
HierarchicalTableIdentifier tableIdentifier,
CatalogPartition partition,
StorageDescriptor sd) {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
partitionSd.setOutputFormat(sd.getOutputFormat());
partitionSd.setSerdeInfo(sd.getSerdeInfo());
partitionSd.setLocation(partition.getStorageLocation());

return new Partition(
partition.getValues(),
tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName(),
0,
0,
partitionSd,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -48,7 +48,7 @@
import org.apache.xtable.hms.HMSCatalogConfig;
import org.apache.xtable.hms.HMSSchemaExtractor;
import org.apache.xtable.hudi.HudiTableManager;
import org.apache.xtable.hudi.catalog.HudiCatalogTableUtils;
import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor;
import org.apache.xtable.hudi.catalog.HudiInputFormatUtils;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
Expand All @@ -63,6 +63,7 @@ public class HudiHMSCatalogTableBuilder implements CatalogTableBuilder<Table, Ta
private final HudiTableManager hudiTableManager;
private final HMSSchemaExtractor schemaExtractor;
private final HMSCatalogConfig hmsCatalogConfig;
private final HudiCatalogTablePropertiesExtractor tablePropertiesExtractor;

private HoodieTableMetaClient metaClient;

Expand All @@ -73,18 +74,21 @@ public HudiHMSCatalogTableBuilder(
this.hudiTableManager = HudiTableManager.of(configuration);
this.schemaExtractor = HMSSchemaExtractor.getInstance();
this.hmsCatalogConfig = hmsCatalogConfig;
this.tablePropertiesExtractor = HudiCatalogTablePropertiesExtractor.getInstance();
}

@VisibleForTesting
HudiHMSCatalogTableBuilder(
HMSCatalogConfig hmsCatalogConfig,
HMSSchemaExtractor schemaExtractor,
HudiTableManager hudiTableManager,
HoodieTableMetaClient metaClient) {
HoodieTableMetaClient metaClient,
HudiCatalogTablePropertiesExtractor propertiesExtractor) {
this.hudiTableManager = hudiTableManager;
this.schemaExtractor = schemaExtractor;
this.metaClient = metaClient;
this.hmsCatalogConfig = hmsCatalogConfig;
this.tablePropertiesExtractor = propertiesExtractor;
}

HoodieTableMetaClient getMetaClient(String basePath) {
Expand All @@ -94,7 +98,7 @@ HoodieTableMetaClient getMetaClient(String basePath) {

if (!metaClientOpt.isPresent()) {
throw new CatalogSyncException(
"failed to get meta client since table is not present in the base path " + basePath);
"Failed to get meta client since table is not present in the base path " + basePath);
}

metaClient = metaClientOpt.get();
Expand All @@ -117,13 +121,8 @@ public Table getCreateTableRequest(
"Failed to set owner for hms table: " + tableIdentifier.getTableName(), e);
}

newTb.setCreateTime((int) ZonedDateTime.now().toEpochSecond());
List<String> partitionFields =
table.getPartitioningFields().stream()
.map(field -> field.getSourceField().getName())
.collect(Collectors.toList());
Map<String, String> tableProperties =
getTableProperties(partitionFields, table.getReadSchema());
newTb.setCreateTime((int) Instant.now().toEpochMilli());
Map<String, String> tableProperties = getTableProperties(table, table.getReadSchema());
newTb.setParameters(tableProperties);
newTb.setSd(getStorageDescriptor(table));
newTb.setPartitionKeys(getSchemaPartitionKeys(table));
Expand All @@ -134,12 +133,8 @@ public Table getCreateTableRequest(
public Table getUpdateTableRequest(
InternalTable table, Table hmsTable, CatalogTableIdentifier tableIdentifier) {
Map<String, String> parameters = hmsTable.getParameters();
List<String> partitionFields =
table.getPartitioningFields().stream()
.map(field -> field.getSourceField().getName())
.collect(Collectors.toList());
Map<String, String> tableParameters = hmsTable.getParameters();
tableParameters.putAll(getTableProperties(partitionFields, table.getReadSchema()));
tableParameters.putAll(getTableProperties(table, table.getReadSchema()));
hmsTable.setParameters(tableParameters);
hmsTable.setSd(getStorageDescriptor(table));

Expand All @@ -148,19 +143,21 @@ public Table getUpdateTableRequest(
return hmsTable;
}

private Map<String, String> getTableProperties(
List<String> partitionFields, InternalSchema schema) {
private Map<String, String> getTableProperties(InternalTable table, InternalSchema schema) {
List<String> partitionFields =
table.getPartitioningFields().stream()
.map(field -> field.getSourceField().getName())
.collect(Collectors.toList());
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(HUDI_METADATA_CONFIG, "true");
Map<String, String> sparkTableProperties =
HudiCatalogTableUtils.getSparkTableProperties(
tablePropertiesExtractor.getSparkTableProperties(
partitionFields, "", hmsCatalogConfig.getSchemaLengthThreshold(), schema);
tableProperties.putAll(sparkTableProperties);
return tableProperties;
}

@VisibleForTesting
StorageDescriptor getStorageDescriptor(InternalTable table) {
private StorageDescriptor getStorageDescriptor(InternalTable table) {
final StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(schemaExtractor.toColumns(TableFormat.HUDI, table.getReadSchema()));
storageDescriptor.setLocation(table.getBasePath());
Expand All @@ -171,22 +168,22 @@ StorageDescriptor getStorageDescriptor(InternalTable table) {
String serdeClassName = HudiInputFormatUtils.getSerDeClassName(fileFormat);
storageDescriptor.setInputFormat(inputFormatClassName);
storageDescriptor.setOutputFormat(outputFormatClassName);
Map<String, String> serdeProperties = getSerdeProperties(false, table.getBasePath());
Map<String, String> serdeProperties = getSerdeProperties(table.getBasePath());
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib(serdeClassName);
serDeInfo.setParameters(serdeProperties);
storageDescriptor.setSerdeInfo(serDeInfo);
return storageDescriptor;
}

private static Map<String, String> getSerdeProperties(boolean readAsOptimized, String basePath) {
private static Map<String, String> getSerdeProperties(String basePath) {
Map<String, String> serdeProperties = new HashMap<>();
serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath);
serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(false));
return serdeProperties;
}

List<FieldSchema> getSchemaPartitionKeys(InternalTable table) {
private List<FieldSchema> getSchemaPartitionKeys(InternalTable table) {

List<InternalPartitionField> partitioningFields = table.getPartitioningFields();
Map<String, FieldSchema> fieldSchemaMap =
Expand Down
Loading

0 comments on commit c3543fa

Please sign in to comment.