diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java index 2ecd7ad6d229..f98ab58921de 100644 --- a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java @@ -45,6 +45,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; import static java.nio.charset.StandardCharsets.UTF_8; @@ -77,11 +78,12 @@ public class ORCRecordReader implements RecordReader { public void init(File dataFile, @Nullable Set fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { Configuration configuration = new Configuration(); - Reader orcReader = OrcFile.createReader(new Path(dataFile.getAbsolutePath()), + File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, "orc"); + Reader orcReader = OrcFile.createReader(new Path(orcFile.getAbsolutePath()), OrcFile.readerOptions(configuration).filesystem(FileSystem.getLocal(configuration))); TypeDescription orcSchema = orcReader.getSchema(); - Preconditions - .checkState(orcSchema.getCategory() == TypeDescription.Category.STRUCT, "ORC schema must be of type: STRUCT"); + Preconditions.checkState(orcSchema.getCategory() == TypeDescription.Category.STRUCT, + "ORC schema must be of type: STRUCT"); _orcFields = orcSchema.getFieldNames(); _orcFieldTypes = orcSchema.getChildren(); @@ -128,9 +130,8 @@ private void initFieldsToRead(boolean[] orcReaderInclude, TypeDescription fieldT // Maps always have two child columns for its keys and values List children = fieldType.getChildren(); TypeDescription.Category keyCategory = children.get(0).getCategory(); - Preconditions - .checkState(isSupportedSingleValueType(keyCategory), "Illegal map key field type: %s (field %s)", keyCategory, - field); + Preconditions.checkState(isSupportedSingleValueType(keyCategory), "Illegal map key field type: %s (field %s)", + keyCategory, field); initFieldsToRead(orcReaderInclude, children.get(1), field); } else if (category == TypeDescription.Category.STRUCT) { List childrenFieldNames = fieldType.getFieldNames(); @@ -141,9 +142,8 @@ private void initFieldsToRead(boolean[] orcReaderInclude, TypeDescription fieldT } } else { // Single-value field - Preconditions - .checkState(isSupportedSingleValueType(category), "Illegal single-value field type: %s (field %s)", category, - field); + Preconditions.checkState(isSupportedSingleValueType(category), "Illegal single-value field type: %s (field %s)", + category, field); } } diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java index c244d93417f0..dd9b12a78ca3 100644 --- a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java @@ -17,9 +17,15 @@ * specific language governing permissions and limitations * under the License. */ + import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.zip.GZIPOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -32,6 +38,7 @@ import org.apache.orc.Writer; import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest; import org.apache.pinot.spi.data.readers.RecordReader; +import org.testng.annotations.Test; import static java.nio.charset.StandardCharsets.UTF_8; @@ -39,6 +46,13 @@ public class ORCRecordReaderTest extends AbstractRecordReaderTest { private final File _dataFile = new File(_tempDir, "data.orc"); + private void compressGzip(String sourcePath, String targetPath) + throws IOException { + try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) { + Files.copy(Paths.get(sourcePath), gos); + } + } + @Override protected RecordReader createRecordReader() throws Exception { @@ -143,4 +157,17 @@ protected void writeRecordsToFile(List> recordsToWrite) } writer.close(); } + + @Test + public void testGzipORCRecordReader() + throws Exception { + String gzipFileName = "data.orc.gz"; + compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, gzipFileName)); + final File gzDataFile = new File(_tempDir, gzipFileName); + ORCRecordReader orcRecordReader = new ORCRecordReader(); + orcRecordReader.init(gzDataFile, _sourceFields, null); + checkValue(orcRecordReader, _records, _primaryKeys); + orcRecordReader.rewind(); + checkValue(orcRecordReader, _records, _primaryKeys); + } } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java index 9c494f0baf33..787ddc25e74a 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java @@ -29,6 +29,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; /** @@ -48,7 +49,8 @@ public class ParquetAvroRecordReader implements RecordReader { @Override public void init(File dataFile, @Nullable Set fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { - _dataFilePath = new Path(dataFile.getAbsolutePath()); + File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet"); + _dataFilePath = new Path(parquetFile.getAbsolutePath()); _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath); _recordExtractor = new AvroRecordExtractor(); _recordExtractor.init(fieldsToRead, null); diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java index 3f413b9a684c..da89c8a382d3 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java @@ -37,6 +37,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; /** @@ -58,17 +59,16 @@ public class ParquetNativeRecordReader implements RecordReader { @Override public void init(File dataFile, @Nullable Set fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { - _dataFilePath = new Path(dataFile.getAbsolutePath()); + File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet"); + _dataFilePath = new Path(parquetFile.getAbsolutePath()); _hadoopConf = ParquetUtils.getParquetHadoopConfiguration(); _recordExtractor = new ParquetNativeRecordExtractor(); _recordExtractor.init(fieldsToRead, null); - _parquetReadOptions = ParquetReadOptions.builder() - .withMetadataFilter(ParquetMetadataConverter.NO_FILTER) - .build(); + _parquetReadOptions = ParquetReadOptions.builder().withMetadataFilter(ParquetMetadataConverter.NO_FILTER).build(); - _parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), - _parquetReadOptions); + _parquetFileReader = + ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), _parquetReadOptions); _schema = _parquetFileReader.getFooter().getFileMetaData().getSchema(); _pageReadStore = _parquetFileReader.readNextRowGroup(); _columnIO = new ColumnIOFactory().getColumnIO(_schema); @@ -76,7 +76,6 @@ public void init(File dataFile, @Nullable Set fieldsToRead, @Nullable Re _currentPageIdx = 0; } - @Override public boolean hasNext() { if (_pageReadStore == null) { @@ -119,8 +118,8 @@ public GenericRow next(GenericRow reuse) public void rewind() throws IOException { _parquetFileReader.close(); - _parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), - _parquetReadOptions); + _parquetFileReader = + ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), _parquetReadOptions); _pageReadStore = _parquetFileReader.readNextRowGroup(); _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema)); _currentPageIdx = 0; diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java index 6be22851fc1b..60886b3b30dc 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java @@ -26,6 +26,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; /** @@ -39,6 +40,7 @@ public class ParquetRecordReader implements RecordReader { @Override public void init(File dataFile, @Nullable Set fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { + File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet"); if (recordReaderConfig != null && ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) { _internalParquetRecordReader = new ParquetAvroRecordReader(); } else if (recordReaderConfig != null @@ -47,14 +49,14 @@ public void init(File dataFile, @Nullable Set fieldsToRead, @Nullable Re _internalParquetRecordReader = new ParquetNativeRecordReader(); } else { // No reader type specified. Determine using file metadata - if (ParquetUtils.hasAvroSchemaInFileMetadata(new Path(dataFile.getAbsolutePath()))) { + if (ParquetUtils.hasAvroSchemaInFileMetadata(new Path(parquetFile.getAbsolutePath()))) { _internalParquetRecordReader = new ParquetAvroRecordReader(); } else { _useAvroParquetRecordReader = false; _internalParquetRecordReader = new ParquetNativeRecordReader(); } } - _internalParquetRecordReader.init(dataFile, fieldsToRead, recordReaderConfig); + _internalParquetRecordReader.init(parquetFile, fieldsToRead, recordReaderConfig); } @Override diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java index a55e3fa702bb..f576a0a32582 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java @@ -59,8 +59,8 @@ public static ParquetReader getParquetAvroReader(Path path) */ public static ParquetWriter getParquetAvroWriter(Path path, Schema schema) throws IOException { - return AvroParquetWriter.builder(path).withSchema(schema) - .withConf(getParquetHadoopConfiguration()).build(); + return AvroParquetWriter.builder(path).withSchema(schema).withConf(getParquetHadoopConfiguration()) + .build(); } /** @@ -85,7 +85,8 @@ public static Schema getParquetAvroSchema(Path path) } } - public static boolean hasAvroSchemaInFileMetadata(Path path) throws IOException { + public static boolean hasAvroSchemaInFileMetadata(Path path) + throws IOException { ParquetMetadata footer = ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path, ParquetMetadataConverter.NO_FILTER); Map metaData = footer.getFileMetaData().getKeyValueMetaData(); diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java index ffc80a98bba4..378195d9764a 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java @@ -56,8 +56,8 @@ protected void writeRecordsToFile(List> recordsToWrite) } records.add(record); } - try (ParquetWriter writer = ParquetUtils - .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) { + try (ParquetWriter writer = ParquetUtils.getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), + schema)) { for (GenericRecord record : records) { writer.write(record); } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java index 14dda0e5d1b4..345cb1cdbd20 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java @@ -20,10 +20,14 @@ import com.google.common.collect.ImmutableSet; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.zip.GZIPOutputStream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -40,6 +44,7 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest { private final File _dataFile = new File(_tempDir, "data.parquet"); + private final String _gzipFileName = "data.parquet.gz"; private final File _testParquetFileWithInt96AndDecimal = new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile()); @@ -63,14 +68,21 @@ protected void writeRecordsToFile(List> recordsToWrite) } records.add(record); } - try (ParquetWriter writer = ParquetUtils - .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) { + try (ParquetWriter writer = ParquetUtils.getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), + schema)) { for (GenericRecord record : records) { writer.write(record); } } } + private void compressGzip(String sourcePath, String targetPath) + throws IOException { + try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) { + Files.copy(Paths.get(sourcePath), gos); + } + } + @Test public void testParquetAvroRecordReader() throws IOException { @@ -108,7 +120,6 @@ public void testFileMetadataParsing() // Should be avro since file metadata has avro schema Assert.assertTrue(parquetRecordReader.useAvroParquetRecordReader()); - final ParquetRecordReader parquetRecordReader2 = new ParquetRecordReader(); File nativeParquetFile = new File(getClass().getClassLoader().getResource("users.parquet").getFile()); parquetRecordReader.init(nativeParquetFile, null, null); @@ -157,4 +168,42 @@ private void testComparison(File dataFile, int totalRecords) Assert.assertEquals(recordsRead, totalRecords, "Message read from ParquetRecordReader doesn't match the expected number."); } + + @Test + public void testGzipParquetRecordReader() + throws IOException { + compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName)); + final File gzDataFile = new File(_tempDir, _gzipFileName); + ParquetRecordReader recordReader = new ParquetRecordReader(); + recordReader.init(gzDataFile, _sourceFields, null); + testReadParquetFile(recordReader, SAMPLE_RECORDS_SIZE); + } + + @Test + public void testGzipParquetAvroRecordReader() + throws IOException { + ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader(); + compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName)); + final File gzDataFile = new File(_tempDir, _gzipFileName); + avroRecordReader.init(gzDataFile, null, new ParquetRecordReaderConfig()); + testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE); + } + + @Test + public void testGzipParquetNativeRecordReader() + throws IOException { + ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader(); + + final String gzParquetFileWithInt96AndDecimal = + String.format("%s.gz", _testParquetFileWithInt96AndDecimal.getAbsolutePath()); + compressGzip(_testParquetFileWithInt96AndDecimal.getAbsolutePath(), gzParquetFileWithInt96AndDecimal); + final File gzTestParquetFileWithInt96AndDecimal = new File(gzParquetFileWithInt96AndDecimal); + nativeRecordReader.init(gzTestParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig()); + testReadParquetFile(nativeRecordReader, 1965); + + compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName)); + final File gzDataFile = new File(_tempDir, _gzipFileName); + nativeRecordReader.init(gzDataFile, ImmutableSet.of(), new ParquetRecordReaderConfig()); + testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java index 657686e196ac..fd84c5f306f3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java @@ -25,7 +25,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.RandomAccessFile; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.util.zip.GZIPInputStream; @@ -57,4 +60,26 @@ public static InputStream getInputStream(File dataFile) return new FileInputStream(dataFile); } } + + public static File unpackIfRequired(File dataFile, String extension) + throws IOException { + if (isGZippedFile(dataFile)) { + try (final InputStream inputStream = getInputStream(dataFile)) { + File targetFile = new File(String.format("%s.%s", dataFile.getAbsolutePath(), extension)); + Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + return targetFile; + } + } else { + return dataFile; + } + } + + private static boolean isGZippedFile(File file) + throws IOException { + int magic = 0; + try (RandomAccessFile raf = new RandomAccessFile(file, "r")) { + magic = raf.read() & 0xff | ((raf.read() << 8) & 0xff00); + } + return magic == GZIPInputStream.GZIP_MAGIC; + } }