Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add compressed file support for ORCRecordReader #9884

Merged
merged 5 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.InputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -45,6 +48,7 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand Down Expand Up @@ -77,7 +81,8 @@ public class ORCRecordReader implements RecordReader {
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
Configuration configuration = new Configuration();
Reader orcReader = OrcFile.createReader(new Path(dataFile.getAbsolutePath()),
File orcFile = unpackIfRequired(dataFile);
Reader orcReader = OrcFile.createReader(new Path(orcFile.getAbsolutePath()),
OrcFile.readerOptions(configuration).filesystem(FileSystem.getLocal(configuration)));
TypeDescription orcSchema = orcReader.getSchema();
Preconditions
Expand Down Expand Up @@ -106,6 +111,18 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re
_nextRowId = 0;
}

private File unpackIfRequired(File dataFile) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to RecordReaderUtils class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!
I decided to add extension as a function parameter explicitly, as you've mentioned above the archive will not necessarily have .gz suffix.

if (RecordReaderUtils.isGZippedFile(dataFile)) {
try(final InputStream inputStream = RecordReaderUtils.getInputStream(dataFile)) {
File targetFile = new File(String.format("%s.orc", dataFile.getAbsolutePath()));
Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
return targetFile;
}
} else {
return dataFile;
}
}

/**
* Initializes the fields to be read using the field ID. Traverses children fields in the case
* of struct, list, or map types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
* under the License.
*/
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPOutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
Expand All @@ -32,13 +38,21 @@
import org.apache.orc.Writer;
import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.testng.annotations.Test;

import static java.nio.charset.StandardCharsets.UTF_8;


public class ORCRecordReaderTest extends AbstractRecordReaderTest {
private final File _dataFile = new File(_tempDir, "data.orc");

private void compressGzip(String sourcePath, String targetPath) throws IOException {
try (GZIPOutputStream gos = new GZIPOutputStream(
new FileOutputStream(Paths.get(targetPath).toFile()))) {
Files.copy(Paths.get(sourcePath), gos);
}
}

@Override
protected RecordReader createRecordReader()
throws Exception {
Expand Down Expand Up @@ -143,4 +157,17 @@ protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
}
writer.close();
}

@Test
public void testGzipORCRecordReader()
throws Exception {
String gzipFileName = "data.orc.gz";
compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, gzipFileName));
final File gzDataFile = new File(_tempDir, gzipFileName);
ORCRecordReader orcRecordReader = new ORCRecordReader();
orcRecordReader.init(gzDataFile, _sourceFields, null);
checkValue(orcRecordReader, _records, _primaryKeys);
orcRecordReader.rewind();
checkValue(orcRecordReader, _records, _primaryKeys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;

import static org.apache.pinot.plugin.inputformat.parquet.ParquetUtils.unpackIfRequired;


/**
* Avro Record reader for Parquet file. This reader doesn't read parquet file with incompatible Avro schemas,
Expand All @@ -48,7 +50,8 @@ public class ParquetAvroRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
_dataFilePath = new Path(dataFile.getAbsolutePath());
File parquetFile = unpackIfRequired(dataFile);
_dataFilePath = new Path(parquetFile.getAbsolutePath());
_parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
_recordExtractor = new AvroRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;

import static org.apache.pinot.plugin.inputformat.parquet.ParquetUtils.unpackIfRequired;


/**
* Record reader for Native Parquet file.
Expand All @@ -58,7 +60,8 @@ public class ParquetNativeRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
_dataFilePath = new Path(dataFile.getAbsolutePath());
File parquetFile = unpackIfRequired(dataFile);
_dataFilePath = new Path(parquetFile.getAbsolutePath());
_hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
_recordExtractor = new ParquetNativeRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;

import static org.apache.pinot.plugin.inputformat.parquet.ParquetUtils.unpackIfRequired;


/**
* Pinot Record reader for Parquet file.<p>
Expand All @@ -39,6 +41,7 @@ public class ParquetRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
File parquetFile = unpackIfRequired(dataFile);
if (recordReaderConfig != null && ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
_internalParquetRecordReader = new ParquetAvroRecordReader();
} else if (recordReaderConfig != null
Expand All @@ -47,14 +50,14 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re
_internalParquetRecordReader = new ParquetNativeRecordReader();
} else {
// No reader type specified. Determine using file metadata
if (ParquetUtils.hasAvroSchemaInFileMetadata(new Path(dataFile.getAbsolutePath()))) {
if (ParquetUtils.hasAvroSchemaInFileMetadata(new Path(parquetFile.getAbsolutePath()))) {
_internalParquetRecordReader = new ParquetAvroRecordReader();
} else {
_useAvroParquetRecordReader = false;
_internalParquetRecordReader = new ParquetNativeRecordReader();
}
}
_internalParquetRecordReader.init(dataFile, fieldsToRead, recordReaderConfig);
_internalParquetRecordReader.init(parquetFile, fieldsToRead, recordReaderConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
*/
package org.apache.pinot.plugin.inputformat.parquet;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand All @@ -34,6 +38,7 @@
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;


public class ParquetUtils {
Expand Down Expand Up @@ -102,4 +107,16 @@ public static Configuration getParquetHadoopConfiguration() {
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
return conf;
}

public static File unpackIfRequired(File dataFile) throws IOException {
if (RecordReaderUtils.isGZippedFile(dataFile)) {
try(final InputStream inputStream = RecordReaderUtils.getInputStream(dataFile)) {
File targetFile = new File(String.format("%s.parquet", dataFile.getAbsolutePath()));
Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
return targetFile;
}
} else {
return dataFile;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@

import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPOutputStream;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -40,6 +45,7 @@

public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
private final File _dataFile = new File(_tempDir, "data.parquet");
private final String _gzipFileName = "data.parquet.gz";
private final File _testParquetFileWithInt96AndDecimal =
new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile());

Expand Down Expand Up @@ -71,6 +77,13 @@ protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
}
}

private void compressGzip(String sourcePath, String targetPath) throws IOException {
try (GZIPOutputStream gos = new GZIPOutputStream(
new FileOutputStream(Paths.get(targetPath).toFile()))) {
Files.copy(Paths.get(sourcePath), gos);
}
}

@Test
public void testParquetAvroRecordReader()
throws IOException {
Expand Down Expand Up @@ -157,4 +170,42 @@ private void testComparison(File dataFile, int totalRecords)
Assert.assertEquals(recordsRead, totalRecords,
"Message read from ParquetRecordReader doesn't match the expected number.");
}

@Test
public void testGzipParquetRecordReader()
throws IOException {
compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
final File gzDataFile = new File(_tempDir, _gzipFileName);
ParquetRecordReader recordReader = new ParquetRecordReader();
recordReader.init(gzDataFile, _sourceFields, null);
testReadParquetFile(recordReader, SAMPLE_RECORDS_SIZE);
}

@Test
public void testGzipParquetAvroRecordReader()
throws IOException {
ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader();
compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
final File gzDataFile = new File(_tempDir, _gzipFileName);
avroRecordReader.init(gzDataFile, null, new ParquetRecordReaderConfig());
testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE);
}

@Test
public void testGzipParquetNativeRecordReader()
throws IOException {
ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader();

final String gzParquetFileWithInt96AndDecimal
= String.format("%s.gz", _testParquetFileWithInt96AndDecimal.getAbsolutePath());
compressGzip(_testParquetFileWithInt96AndDecimal.getAbsolutePath(), gzParquetFileWithInt96AndDecimal);
final File gzTestParquetFileWithInt96AndDecimal = new File(gzParquetFileWithInt96AndDecimal);
nativeRecordReader.init(gzTestParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig());
testReadParquetFile(nativeRecordReader, 1965);

compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
final File gzDataFile = new File(_tempDir, _gzipFileName);
nativeRecordReader.init(gzDataFile, ImmutableSet.of(), new ParquetRecordReaderConfig());
testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;

Expand Down Expand Up @@ -57,4 +58,13 @@ public static InputStream getInputStream(File dataFile)
return new FileInputStream(dataFile);
}
}

public static boolean isGZippedFile(File file)
throws IOException {
int magic = 0;
try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
magic = raf.read() & 0xff | ((raf.read() << 8) & 0xff00);
}
return magic == GZIPInputStream.GZIP_MAGIC;
}
}