maskColumns) {
return this;
}
+ /**
+ * Set the columns to be renamed.
+ *
+ * Note that nested columns can't be renamed, in case of GroupType column only top level column can be renamed.
+ *
+ * @param renameColumns map where keys are original names and values are new names
+ * @return self
+ */
+ public Builder renameColumns(Map renameColumns) {
+ this.renameColumns = renameColumns;
+ return this;
+ }
+
/**
* Set the columns to encrypt.
*
@@ -551,6 +575,28 @@ public Builder ignoreJoinFilesMetadata(boolean ignoreJoinFilesMetadata) {
* @return a RewriterOptions
*/
public RewriteOptions build() {
+ checkPreconditions();
+ return new RewriteOptions(
+ conf,
+ inputFiles,
+ (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
+ outputFile,
+ pruneColumns,
+ newCodecName,
+ maskColumns,
+ renameColumns == null
+ ? new HashMap<>()
+ : renameColumns.entrySet().stream()
+ .collect(Collectors.toMap(x -> x.getKey().trim(), x -> x.getValue()
+ .trim())),
+ encryptColumns,
+ fileEncryptionProperties,
+ indexCacheStrategy,
+ overwriteInputWithJoinColumns,
+ ignoreJoinFilesMetadata);
+ }
+
+ private void checkPreconditions() {
Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), "Input file is required");
Preconditions.checkArgument(outputFile != null, "Output file is required");
@@ -561,7 +607,6 @@ public RewriteOptions build() {
!maskColumns.containsKey(pruneColumn), "Cannot prune and mask same column");
}
}
-
if (encryptColumns != null) {
for (String pruneColumn : pruneColumns) {
Preconditions.checkArgument(
@@ -570,6 +615,26 @@ public RewriteOptions build() {
}
}
+ if (renameColumns != null) {
+ Set nullifiedColumns = maskColumns == null
+ ? new HashSet<>()
+ : maskColumns.entrySet().stream()
+ .filter(x -> x.getValue() == MaskMode.NULLIFY)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ renameColumns.forEach((colSrc, colDst) -> {
+ Preconditions.checkArgument(
+ colSrc != null && !colSrc.trim().isEmpty(), "Renamed column source name can't be empty");
+ Preconditions.checkArgument(
+ colDst != null && !colDst.trim().isEmpty(), "Renamed column target name can't be empty");
+ Preconditions.checkArgument(
+ !nullifiedColumns.contains(colSrc), "Cannot nullify and rename the same column");
+ Preconditions.checkArgument(
+ !colSrc.contains(".") && !colDst.contains("."),
+ "Renamed column can't be nested, in case of GroupType column only a top level column can be renamed");
+ });
+ }
+
if (encryptColumns != null && !encryptColumns.isEmpty()) {
Preconditions.checkArgument(
fileEncryptionProperties != null,
@@ -581,20 +646,6 @@ public RewriteOptions build() {
encryptColumns != null && !encryptColumns.isEmpty(),
"Encrypt columns is required when FileEncryptionProperties is set");
}
-
- return new RewriteOptions(
- conf,
- inputFiles,
- (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
- outputFile,
- pruneColumns,
- newCodecName,
- maskColumns,
- encryptColumns,
- fileEncryptionProperties,
- indexCacheStrategy,
- overwriteInputWithJoinColumns,
- ignoreJoinFilesMetadata);
}
}
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 34c90a4641..c1da97c403 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop.rewrite;
+import static java.util.Collections.emptyMap;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
@@ -181,10 +182,10 @@ private void testPruneSingleColumnTranslateCodec(List inputPaths) throws E
null);
// Verify the data are not changed for the columns not pruned
- validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false);
+ validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, emptyMap());
// Verify the page index
- validatePageIndex(new HashSet<>(), false);
+ validatePageIndex(new HashSet<>(), false, emptyMap());
// Verify original.created.by is preserved
validateCreatedBy();
@@ -199,7 +200,7 @@ public void setUp() {
@Test
public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception {
- ensureContainsGzipFile();
+ addGzipInputFile();
List inputPaths = new ArrayList() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -210,8 +211,8 @@ public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception {
@Test
public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
List inputPaths = new ArrayList() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -252,10 +253,10 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except
null);
// Verify the data are not changed for the columns not pruned
- validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), null, false);
+ validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), null, false, emptyMap());
// Verify the page index
- validatePageIndex(ImmutableSet.of("Links.Forward"), false);
+ validatePageIndex(ImmutableSet.of("Links.Forward"), false, emptyMap());
// Verify original.created.by is preserved
validateCreatedBy();
@@ -264,7 +265,7 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except
@Test
public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
- ensureContainsGzipFile();
+ addGzipInputFile();
List inputPaths = new ArrayList() {
{
@@ -276,8 +277,8 @@ public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
@Test
public void testPruneNullifyTranslateCodecTwoFiles() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
List inputPaths = new ArrayList() {
{
@@ -327,7 +328,8 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except
fileDecryptionProperties);
// Verify the data are not changed for the columns not pruned
- validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false);
+ validateColumnData(
+ new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false, emptyMap());
// Verify column encryption
ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties);
@@ -349,7 +351,7 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except
@Test
public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
- ensureContainsGzipFile();
+ addGzipInputFile();
List inputPaths = new ArrayList() {
{
@@ -361,8 +363,8 @@ public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
@Test
public void testPruneEncryptTranslateCodecTwoFiles() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
List inputPaths = new ArrayList() {
{
@@ -488,10 +490,10 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception
// Verify the data are not changed for non-encrypted and non-masked columns.
// Also make sure the masked column is nullified.
- validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties, false);
+ validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties, false, emptyMap());
// Verify the page index
- validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false);
+ validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false, emptyMap());
// Verify the column is encrypted
ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties);
@@ -511,7 +513,7 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception
@Test
public void testNullifyEncryptSingleFile() throws Exception {
- ensureContainsGzipFile();
+ addGzipInputFile();
List inputPaths = new ArrayList() {
{
@@ -523,8 +525,8 @@ public void testNullifyEncryptSingleFile() throws Exception {
@Test
public void testNullifyEncryptTwoFiles() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
List inputPaths = new ArrayList() {
{
@@ -537,8 +539,8 @@ public void testNullifyEncryptTwoFiles() throws Exception {
@Test
public void testMergeTwoFilesOnly() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
// Only merge two files but do not change anything.
List inputPaths = new ArrayList<>();
@@ -571,27 +573,103 @@ public void testMergeTwoFilesOnly() throws Exception {
null);
// Verify the merged data are not changed
- validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false);
+ validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false, emptyMap());
// Verify the page index
- validatePageIndex(new HashSet<>(), false);
+ validatePageIndex(new HashSet<>(), false, emptyMap());
// Verify original.created.by is preserved
validateCreatedBy();
validateRowGroupRowCount();
}
+ @Test
+ public void testMergeTwoFilesOnlyRenameColumn() throws Exception {
+ addGzipInputFile();
+ addUncompressedInputFile();
+
+ Map renameColumns = ImmutableMap.of("Name", "NameRenamed");
+ List pruneColumns = ImmutableList.of("Gender");
+ String[] encryptColumns = {"DocId"};
+ FileEncryptionProperties fileEncryptionProperties =
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
+ List inputPaths =
+ inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList());
+ RewriteOptions.Builder builder = createBuilder(inputPaths);
+ RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy)
+ .renameColumns(ImmutableMap.of("Name", "NameRenamed"))
+ .prune(pruneColumns)
+ .transform(CompressionCodecName.SNAPPY)
+ .encrypt(Arrays.asList(encryptColumns))
+ .encryptionProperties(fileEncryptionProperties)
+ .build();
+
+ rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+
+ FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties();
+
+ // Verify the schema is not changed
+ ParquetMetadata pmd =
+ ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = pmd.getFileMetaData().getSchema();
+ MessageType expectSchema = createSchemaWithRenamed();
+ assertEquals(expectSchema, schema);
+
+ verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.SNAPPY), fileDecryptionProperties); // Verify codec
+ // Verify the merged data are not changed
+ validateColumnData(
+ new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false, renameColumns);
+ validatePageIndex(ImmutableSet.of("DocId"), false, renameColumns); // Verify the page index
+ validateCreatedBy(); // Verify original.created.by is preserved
+ validateRowGroupRowCount();
+
+ ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties);
+ assertFalse(metaData.getBlocks().isEmpty());
+ Set encryptedColumns = new HashSet<>(Arrays.asList(encryptColumns));
+ for (BlockMetaData blockMetaData : metaData.getBlocks()) {
+ List columns = blockMetaData.getColumns();
+ for (ColumnChunkMetaData column : columns) {
+ if (encryptedColumns.contains(column.getPath().toDotString())) {
+ assertTrue(column.isEncrypted());
+ } else {
+ assertFalse(column.isEncrypted());
+ }
+ }
+ }
+ }
+
@Test(expected = InvalidSchemaException.class)
public void testMergeTwoFilesWithDifferentSchema() throws Exception {
- testMergeTwoFilesWithDifferentSchemaSetup(true);
+ testMergeTwoFilesWithDifferentSchemaSetup(true, null, null);
}
@Test(expected = InvalidSchemaException.class)
public void testMergeTwoFilesToJoinWithDifferentSchema() throws Exception {
- testMergeTwoFilesWithDifferentSchemaSetup(false);
+ testMergeTwoFilesWithDifferentSchemaSetup(false, null, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMergeTwoFilesWithWrongDestinationRenamedColumn() throws Exception {
+ testMergeTwoFilesWithDifferentSchemaSetup(
+ null, ImmutableMap.of("WrongColumnName", "WrongColumnNameRenamed"), null);
}
- public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInputFile) throws Exception {
+ @Test(expected = IllegalArgumentException.class)
+ public void testMergeTwoFilesWithWrongSourceRenamedColumn() throws Exception {
+ testMergeTwoFilesWithDifferentSchemaSetup(null, ImmutableMap.of("Name", "DocId"), null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMergeTwoFilesNullifyAndRenamedSameColumn() throws Exception {
+ testMergeTwoFilesWithDifferentSchemaSetup(
+ null, ImmutableMap.of("Name", "NameRenamed"), ImmutableMap.of("Name", MaskMode.NULLIFY));
+ }
+
+ public void testMergeTwoFilesWithDifferentSchemaSetup(
+ Boolean wrongSchemaInInputFile, Map renameColumns, Map maskColumns)
+ throws Exception {
MessageType schema1 = new MessageType(
"schema",
new PrimitiveType(OPTIONAL, INT64, "DocId"),
@@ -620,27 +698,32 @@ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInput
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
- if (wrongSchemaInInputFile) {
- inputFiles.add(new TestFileBuilder(conf, schema2)
- .withNumRecord(numRecord)
- .withCodec("UNCOMPRESSED")
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .withWriterVersion(writerVersion)
- .build());
- } else {
- inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
- .withNumRecord(numRecord)
- .withCodec("UNCOMPRESSED")
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .withWriterVersion(writerVersion)
- .build());
+ if (wrongSchemaInInputFile != null) {
+ if (wrongSchemaInInputFile) {
+ inputFiles.add(new TestFileBuilder(conf, schema2)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
+ } else {
+ inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
+ }
}
RewriteOptions.Builder builder = createBuilder(
inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()),
inputFilesToJoin.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()),
false);
- RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy).build();
+ RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy)
+ .renameColumns(renameColumns)
+ .mask(maskColumns)
+ .build();
// This should throw an exception because the schemas are different
rewriter = new ParquetRewriter(options);
@@ -648,7 +731,7 @@ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInput
@Test
public void testRewriteFileWithMultipleBlocks() throws Exception {
- ensureContainsGzipFile();
+ addGzipInputFile();
List inputPaths = new ArrayList() {
{
@@ -823,12 +906,13 @@ public void testOneInputFileManyInputFilesToJoinSetup(boolean joinColumnsOverwri
new HashSet<>(pruneColumns),
maskColumns.keySet(),
fileDecryptionProperties,
- joinColumnsOverwrite); // Verify data
+ joinColumnsOverwrite,
+ emptyMap()); // Verify data
validateSchemaWithGenderColumnPruned(true); // Verify schema
validateCreatedBy(); // Verify original.created.by
assertEquals(inputBloomFilters.keySet(), rBloomFilters); // Verify bloom filters
verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.ZSTD), fileDecryptionProperties); // Verify codec
- validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite);
+ validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite, emptyMap());
}
private void testOneInputFileManyInputFilesToJoinSetup() throws IOException {
@@ -884,11 +968,26 @@ private MessageType createSchemaToJoin() {
new PrimitiveType(REPEATED, BINARY, "Forward")));
}
+ private MessageType createSchemaWithRenamed() {
+ return new MessageType(
+ "schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "NameRenamed"),
+ new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
+ new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
+ new GroupType(
+ OPTIONAL,
+ "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+ }
+
private void validateColumnData(
Set prunePaths,
Set nullifiedPaths,
FileDecryptionProperties fileDecryptionProperties,
- Boolean joinColumnsOverwrite)
+ Boolean joinColumnsOverwrite,
+ Map renameColumns)
throws IOException {
ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile))
.withConf(conf)
@@ -901,7 +1000,7 @@ private void validateColumnData(
List filesJoined = inputFilesToJoin.stream()
.flatMap(x -> Arrays.stream(x.getFileContent()))
.collect(Collectors.toList());
- BiFunction groups = (name, rowIdx) -> {
+ BiFunction groupsExpected = (name, rowIdx) -> {
if (!filesMain.get(0).getType().containsField(name)
|| joinColumnsOverwrite
&& !filesJoined.isEmpty()
@@ -915,50 +1014,53 @@ private void validateColumnData(
int totalRows =
inputFiles.stream().mapToInt(x -> x.getFileContent().length).sum();
for (int i = 0; i < totalRows; i++) {
- Group group = reader.read();
- assertNotNull(group);
+ Group groupActual = reader.read();
+ assertNotNull(groupActual);
if (!prunePaths.contains("DocId")) {
if (nullifiedPaths.contains("DocId")) {
- assertThrows(RuntimeException.class, () -> group.getLong("DocId", 0));
+ assertThrows(RuntimeException.class, () -> groupActual.getLong("DocId", 0));
} else {
assertEquals(
- group.getLong("DocId", 0), groups.apply("DocId", i).getLong("DocId", 0));
+ groupActual.getLong("DocId", 0),
+ groupsExpected.apply("DocId", i).getLong("DocId", 0));
}
}
if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
+ String colName = renameColumns.getOrDefault("Name", "Name");
assertArrayEquals(
- group.getBinary("Name", 0).getBytes(),
- groups.apply("Name", i).getBinary("Name", 0).getBytes());
+ groupActual.getBinary(colName, 0).getBytes(),
+ groupsExpected.apply("Name", i).getBinary("Name", 0).getBytes());
}
if (!prunePaths.contains("Gender") && !nullifiedPaths.contains("Gender")) {
assertArrayEquals(
- group.getBinary("Gender", 0).getBytes(),
- groups.apply("Gender", i).getBinary("Gender", 0).getBytes());
+ groupActual.getBinary("Gender", 0).getBytes(),
+ groupsExpected.apply("Gender", i).getBinary("Gender", 0).getBytes());
}
if (!prunePaths.contains("FloatFraction") && !nullifiedPaths.contains("FloatFraction")) {
assertEquals(
- group.getFloat("FloatFraction", 0),
- groups.apply("FloatFraction", i).getFloat("FloatFraction", 0),
+ groupActual.getFloat("FloatFraction", 0),
+ groupsExpected.apply("FloatFraction", i).getFloat("FloatFraction", 0),
0);
}
if (!prunePaths.contains("DoubleFraction") && !nullifiedPaths.contains("DoubleFraction")) {
assertEquals(
- group.getDouble("DoubleFraction", 0),
- groups.apply("DoubleFraction", i).getDouble("DoubleFraction", 0),
+ groupActual.getDouble("DoubleFraction", 0),
+ groupsExpected.apply("DoubleFraction", i).getDouble("DoubleFraction", 0),
0);
}
- Group subGroup = group.getGroup("Links", 0);
+ Group subGroup = groupActual.getGroup("Links", 0);
if (!prunePaths.contains("Links.Backward") && !nullifiedPaths.contains("Links.Backward")) {
assertArrayEquals(
subGroup.getBinary("Backward", 0).getBytes(),
- groups.apply("Links", i)
+ groupsExpected
+ .apply("Links", i)
.getGroup("Links", 0)
.getBinary("Backward", 0)
.getBytes());
@@ -970,7 +1072,8 @@ private void validateColumnData(
} else {
assertArrayEquals(
subGroup.getBinary("Forward", 0).getBytes(),
- groups.apply("Links", i)
+ groupsExpected
+ .apply("Links", i)
.getGroup("Links", 0)
.getBinary("Forward", 0)
.getBytes());
@@ -1014,13 +1117,22 @@ interface CheckedFunction {
R apply(T t) throws IOException;
}
+ private ColumnPath normalizeFieldsInPath(ColumnPath path, Map renameColumns) {
+ String[] pathArray = path.toArray();
+ if (renameColumns != null) {
+ pathArray[0] = renameColumns.getOrDefault(pathArray[0], pathArray[0]);
+ }
+ return ColumnPath.get(pathArray);
+ }
+
/**
* Verify the page index is correct.
*
* @param exclude the columns to exclude from comparison, for example because they were nullified.
* @param joinColumnsOverwrite whether a join columns overwrote existing overlapping columns.
*/
- private void validatePageIndex(Set exclude, boolean joinColumnsOverwrite) throws Exception {
+ private void validatePageIndex(Set exclude, boolean joinColumnsOverwrite, Map renameColumns)
+ throws Exception {
class BlockMeta {
final TransParquetFileReader reader;
final BlockMetaData blockMeta;
@@ -1058,6 +1170,8 @@ class BlockMeta {
List inBlocksJoined = blockMetaExtractor.apply(
inputFilesToJoin.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList()));
List outBlocks = blockMetaExtractor.apply(ImmutableList.of(outputFile));
+ Map renameColumnsInverted =
+ renameColumns.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
for (int blockIdx = 0; blockIdx < outBlocks.size(); blockIdx++) {
BlockMetaData outBlockMeta = outBlocks.get(blockIdx).blockMeta;
TransParquetFileReader outReader = outBlocks.get(blockIdx).reader;
@@ -1066,17 +1180,18 @@ class BlockMeta {
TransParquetFileReader inReader;
BlockMetaData inBlockMeta;
ColumnChunkMetaData inChunk;
- if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())
+ ColumnPath colPath = normalizeFieldsInPath(outChunk.getPath(), renameColumnsInverted);
+ if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(colPath)
|| joinColumnsOverwrite
&& !inBlocksJoined.isEmpty()
- && inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())) {
+ && inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(colPath)) {
inReader = inBlocksJoined.get(blockIdx).reader;
inBlockMeta = inBlocksJoined.get(blockIdx).blockMeta;
- inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+ inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(colPath);
} else {
inReader = inBlocksMain.get(blockIdx).reader;
inBlockMeta = inBlocksMain.get(blockIdx).blockMeta;
- inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+ inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(colPath);
}
ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
@@ -1284,13 +1399,13 @@ conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER)
assertEquals(expectSchema, actualSchema);
}
- private void ensureContainsGzipFile() {
+ private void addGzipInputFile() {
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn);
}
}
- private void ensureContainsUncompressedFile() {
+ private void addUncompressedInputFile() {
if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
}