Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3035: ParquetRewriter: Add a column renaming feature #3036

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
* Please note the schema of all <code>inputFiles</code> must be the same, otherwise the rewrite will fail.
* <p>
* <h2>Applying column transformations</h2>
* Some supported column transformations: pruning, masking, encrypting, changing a codec.
* Some supported column transformations: pruning, masking, renaming, encrypting, changing a codec.
* See {@link RewriteOptions} and {@link RewriteOptions.Builder} for the full list with description.
* <p>
* <h2><i>Joining</i> with extra files with a different schema</h2>
Expand Down Expand Up @@ -145,15 +145,18 @@ public class ParquetRewriter implements Closeable {
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
private final Queue<TransParquetFileReader> inputFilesToJoin = new LinkedList<>();
private final MessageType outSchema;
private final MessageType outSchemaWithRenamedColumns;
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
private final boolean overwriteInputWithJoinColumns;
private final InternalFileEncryptor nullColumnEncryptor;
private final Map<String, String> renamedColumns;

public ParquetRewriter(RewriteOptions options) throws IOException {
this.newCodecName = options.getNewCodecName();
this.indexCacheStrategy = options.getIndexCacheStrategy();
this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns();
this.renamedColumns = options.gerRenameColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
Expand All @@ -169,6 +172,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
out);

this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns());
// TODO check a requirement that all renamed column should be present in outSchema
this.outSchemaWithRenamedColumns = getSchemaWithRenamedColumns(this.outSchema);
this.extraMetaData = getExtraMetadata(options);

if (options.getMaskColumns() != null) {
Expand All @@ -186,7 +191,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
writer = new ParquetFileWriter(
out,
outSchema,
outSchemaWithRenamedColumns != null ? outSchemaWithRenamedColumns : outSchema,
writerMode,
DEFAULT_BLOCK_SIZE,
MAX_PADDING_SIZE_DEFAULT,
Expand Down Expand Up @@ -222,6 +227,7 @@ public ParquetRewriter(
MaskMode maskMode) {
this.writer = writer;
this.outSchema = outSchema;
this.outSchemaWithRenamedColumns = outSchema;
this.newCodecName = codecName;
extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
extraMetaData.put(
Expand All @@ -239,6 +245,7 @@ public ParquetRewriter(
this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
this.overwriteInputWithJoinColumns = false;
this.nullColumnEncryptor = null;
this.renamedColumns = new HashMap<>();
}

private MessageType getSchema() {
Expand Down Expand Up @@ -266,6 +273,27 @@ private MessageType getSchema() {
}
}

private MessageType getSchemaWithRenamedColumns(MessageType schema) {
List<Type> fields = schema.getFields().stream()
.map(type -> {
if (renamedColumns == null || !renamedColumns.containsKey(type.getName())) {
return type;
} else if (type.isPrimitive()) {
return new PrimitiveType(
type.getRepetition(),
type.asPrimitiveType().getPrimitiveTypeName(),
renamedColumns.get(type.getName()));
} else {
return new GroupType(
type.getRepetition(),
renamedColumns.get(type.getName()),
type.asGroupType().getFields());
}
})
.collect(Collectors.toList());
return new MessageType(schema.getName(), fields);
}

private Map<String, String> getExtraMetadata(RewriteOptions options) {
List<TransParquetFileReader> allFiles;
if (options.getIgnoreJoinFilesMetadata()) {
Expand Down Expand Up @@ -421,6 +449,27 @@ public void processBlocks() throws IOException {
if (readerToJoin != null) readerToJoin.close();
}

private ColumnPath renameFieldsInPath(ColumnPath path) {
if (renamedColumns == null) {
return path;
} else {
String[] pathArray = path.toArray();
pathArray[0] = renamedColumns.getOrDefault(pathArray[0], pathArray[0]);
return ColumnPath.get(pathArray);
}
}

private PrimitiveType renameNameInType(PrimitiveType type) {
if (renamedColumns == null) {
return type;
} else {
return new PrimitiveType(
type.getRepetition(),
type.asPrimitiveType().getPrimitiveTypeName(),
renamedColumns.getOrDefault(type.getName(), type.getName()));
}
}

private void processBlock(
TransParquetFileReader reader,
int blockIdx,
Expand All @@ -431,7 +480,27 @@ private void processBlock(
if (chunk.isEncrypted()) {
throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
}
ColumnDescriptor descriptor = outSchema.getColumns().get(outColumnIdx);

ColumnChunkMetaData chunkColumnsRenamed = chunk;
if (renamedColumns != null && !renamedColumns.isEmpty()) {
chunkColumnsRenamed = ColumnChunkMetaData.get(
renameFieldsInPath(chunk.getPath()),
renameNameInType(chunk.getPrimitiveType()),
chunk.getCodec(),
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
chunk.getFirstDataPageOffset(),
chunk.getDictionaryPageOffset(),
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize(),
chunk.getSizeStatistics());
}

ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx);
ColumnDescriptor descriptorRenamed =
outSchemaWithRenamedColumns.getColumns().get(outColumnIdx);
BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx);
String originalCreatedBy = reader.getFileMetaData().getCreatedBy();

Expand All @@ -443,13 +512,21 @@ private void processBlock(
// Mask column and compress it again.
MaskMode maskMode = maskColumns.get(chunk.getPath());
if (maskMode.equals(MaskMode.NULLIFY)) {
Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition();
Type.Repetition repetition =
descriptorOriginal.getPrimitiveType().getRepetition();
if (repetition.equals(Type.Repetition.REQUIRED)) {
throw new IOException(
"Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
throw new IOException("Required column ["
+ descriptorOriginal.getPrimitiveType().getName() + "] cannot be nullified");
}
nullifyColumn(
reader, blockIdx, descriptor, chunk, writer, newCodecName, encryptColumn, originalCreatedBy);
reader,
blockIdx,
descriptorOriginal,
chunk,
writer,
newCodecName,
encryptColumn,
originalCreatedBy);
} else {
throw new UnsupportedOperationException("Only nullify is supported for now");
}
Expand All @@ -462,7 +539,7 @@ private void processBlock(
}

// Translate compression and/or encryption
writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
writer.startColumn(descriptorRenamed, chunk.getValueCount(), newCodecName);
processChunk(
reader,
blockMetaData.getRowCount(),
Expand All @@ -480,7 +557,8 @@ private void processBlock(
BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
writer.appendColumnChunk(
descriptorRenamed, reader.getStream(), chunkColumnsRenamed, bloomFilter, columnIndex, offsetIndex);
}
}

Expand Down Expand Up @@ -522,7 +600,7 @@ private void processChunk(
}

if (bloomFilter != null) {
writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
writer.addBloomFilter(renameFieldsInPath(chunk.getPath()).toDotString(), bloomFilter);
}

reader.setStreamPosition(chunk.getStartingPos());
Expand Down Expand Up @@ -580,7 +658,7 @@ private void processChunk(
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy,
chunk.getPrimitiveType(),
renameNameInType(chunk.getPrimitiveType()),
headerV1.getStatistics(),
columnIndex,
pageOrdinal,
Expand Down Expand Up @@ -648,7 +726,7 @@ private void processChunk(
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy,
chunk.getPrimitiveType(),
renameNameInType(chunk.getPrimitiveType()),
headerV2.getStatistics(),
columnIndex,
pageOrdinal,
Expand Down Expand Up @@ -887,7 +965,7 @@ private void nullifyColumn(
CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(newCodecName);

// Create new schema that only has the current column
MessageType newSchema = newSchema(outSchema, descriptor);
MessageType newSchema = getSchemaWithRenamedColumns(newSchema(outSchema, descriptor));
ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
compressor,
newSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class RewriteOptions {
private final List<String> pruneColumns;
private final CompressionCodecName newCodecName;
private final Map<String, MaskMode> maskColumns;
private final Map<String, String> renameColumns;
private final List<String> encryptColumns;
private final FileEncryptionProperties fileEncryptionProperties;
private final IndexCache.CacheStrategy indexCacheStrategy;
Expand All @@ -63,6 +64,7 @@ private RewriteOptions(
List<String> pruneColumns,
CompressionCodecName newCodecName,
Map<String, MaskMode> maskColumns,
Map<String, String> renameColumns,
List<String> encryptColumns,
FileEncryptionProperties fileEncryptionProperties,
IndexCache.CacheStrategy indexCacheStrategy,
Expand All @@ -75,6 +77,7 @@ private RewriteOptions(
this.pruneColumns = pruneColumns;
this.newCodecName = newCodecName;
this.maskColumns = maskColumns;
this.renameColumns = renameColumns;
this.encryptColumns = encryptColumns;
this.fileEncryptionProperties = fileEncryptionProperties;
this.indexCacheStrategy = indexCacheStrategy;
Expand Down Expand Up @@ -192,6 +195,10 @@ public Map<String, MaskMode> getMaskColumns() {
return maskColumns;
}

public Map<String, String> gerRenameColumns() {
return renameColumns;
}

public List<String> getEncryptColumns() {
return encryptColumns;
}
Expand Down Expand Up @@ -221,6 +228,7 @@ public static class Builder {
private List<String> pruneColumns;
private CompressionCodecName newCodecName;
private Map<String, MaskMode> maskColumns;
private Map<String, String> renameColumns;
private List<String> encryptColumns;
private FileEncryptionProperties fileEncryptionProperties;
private IndexCache.CacheStrategy indexCacheStrategy = IndexCache.CacheStrategy.NONE;
Expand Down Expand Up @@ -432,6 +440,19 @@ public Builder mask(Map<String, MaskMode> maskColumns) {
return this;
}

/**
* Set the columns to be renamed.
* <p>
* Note that nested columns can't be renamed, in case of GroupType column only top level column can be renamed.
*
* @param renameColumns map where keys are original names and values are new names
* @return self
*/
public Builder renameColumns(Map<String, String> renameColumns) {
this.renameColumns = renameColumns;
return this;
}

/**
* Set the columns to encrypt.
* <p>
Expand Down Expand Up @@ -561,13 +582,29 @@ public RewriteOptions build() {
!maskColumns.containsKey(pruneColumn), "Cannot prune and mask same column");
}
}

if (encryptColumns != null) {
for (String pruneColumn : pruneColumns) {
Preconditions.checkArgument(
!encryptColumns.contains(pruneColumn), "Cannot prune and encrypt same column");
}
}
if (renameColumns != null) {
for (Map.Entry<String, String> entry : renameColumns.entrySet()) {
Preconditions.checkArgument(
!encryptColumns.contains(entry.getKey()), "Cannot prune and rename same column");
}
}
}

if (renameColumns != null && !renameColumns.isEmpty()) {
for (Map.Entry<String, String> entry : renameColumns.entrySet()) {
Preconditions.checkArgument(
entry.getValue() != null && !entry.getValue().trim().isEmpty(),
"Renamed column target name can't be empty");
Preconditions.checkArgument(
!entry.getKey().contains(".") && !entry.getValue().contains("."),
"Renamed column name can't be nested, in case of GroupType column only a top level column can be renamed");
}
}

if (encryptColumns != null && !encryptColumns.isEmpty()) {
Expand All @@ -590,6 +627,7 @@ public RewriteOptions build() {
pruneColumns,
newCodecName,
maskColumns,
renameColumns,
encryptColumns,
fileEncryptionProperties,
indexCacheStrategy,
Expand Down
Loading