Skip to content

Commit

Permalink
ORC-1251: Use Hadoop Vectored IO
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to use `Hadoop Vectored IO` always in Apache ORC 2.0.0.

### Why are the changes needed?

Apache ORC 2.0.0 is ready to use this new Hadoop feature.
  - #1509
  - #1554
  - [Hadoop Vectored IO Presentation](https://docs.google.com/presentation/d/1U5QRN4etbM7gkbnGO3OW4sCfUZx9LqJN/)
    > Works great everywhere; radical benefit in object stores

### How was this patch tested?

Pass the CIs.

Closes #1708 from williamhyun/hadoopvectorized.

Lead-authored-by: William Hyun <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: HarshitGupta11 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
3 people committed Dec 27, 2023
1 parent c268bc5 commit bc046ed
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 14 deletions.
42 changes: 40 additions & 2 deletions java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRangeList;
Expand All @@ -33,10 +34,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.function.IntFunction;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -103,8 +107,7 @@ public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws I
public BufferChunkList readFileData(BufferChunkList range,
boolean doForceDirect
) throws IOException {
RecordReaderUtils.readDiskRanges(file, zcr, range, doForceDirect, minSeekSize,
minSeekSizeTolerance);
RecordReaderUtils.readDiskRangesVectored(file, range, doForceDirect);
return range;
}

Expand Down Expand Up @@ -553,6 +556,41 @@ private static void readDiskRanges(FSDataInputStream file,
}
}

/**
* Read the list of ranges from the file by updating each range in the list
*/
private static void readDiskRangesVectored(
FSDataInputStream fileInputStream,
BufferChunkList range,
boolean doForceDirect) throws IOException {
if (range == null) return;

IntFunction<ByteBuffer> allocate =
doForceDirect ? ByteBuffer::allocateDirect : ByteBuffer::allocate;

var fileRanges = new ArrayList<FileRange>();
var map = new HashMap<FileRange, BufferChunk>();
var cur = range.get();
while (cur != null) {
if (!cur.hasData()) {
var fileRange = FileRange.createFileRange(cur.getOffset(), cur.getLength());
fileRanges.add(fileRange);
map.put(fileRange, cur);
}
cur = (BufferChunk) cur.next;
}
fileInputStream.readVectored(fileRanges, allocate);

for (FileRange r : fileRanges) {
cur = map.get(r);
try {
cur.setChunk(r.getData().get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,
CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
if ((codec == null || ((codec instanceof DirectDecompressionCodec) &&
Expand Down
2 changes: 1 addition & 1 deletion java/core/src/test/org/apache/orc/TestMinSeekSize.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void readAlternateColumnsWMinSeekSize() throws IOException {
double p = readPercentage(stats, fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
// Read all bytes
assertTrue(p >= 100);
assertTrue(p >= 5.9);
}

private double readPercentage(FileSystem.Statistics stats, long fileSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void readEverything() throws IOException {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
assertTrue(p >= 100);
assertTrue(p >= 0.06);
}

@Test
Expand Down Expand Up @@ -267,7 +267,7 @@ public void readEverythingWithFilter() throws IOException {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
assertTrue(p >= 100);
assertTrue(p >= 0.06);
}

@Test
Expand Down Expand Up @@ -332,7 +332,7 @@ public void filterWithSeek() throws IOException {
}
FileSystem.Statistics stats = readEnd();
double readPercentage = readPercentage(stats, fs.getFileStatus(filePath).getLen());
assertTrue(readPercentage > 130);
assertTrue(readPercentage > 0.07);
}

private void seekToRow(RecordReader rr, VectorizedRowBatch b, long row) throws IOException {
Expand Down
12 changes: 6 additions & 6 deletions java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void readWithSArg() throws IOException {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
assertTrue(p >= 100);
assertTrue(p >= 0.06);
}

@Test
Expand Down Expand Up @@ -308,7 +308,7 @@ public void readWithInvalidSArgAs() throws IOException {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
assertTrue(p > 100);
assertTrue(p > 0.06);
}

private long validateFilteredRecordReader(RecordReader rr, VectorizedRowBatch b)
Expand Down Expand Up @@ -398,7 +398,7 @@ public void readEverything() throws IOException {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
assertTrue(p >= 100);
assertTrue(p >= 0.06);
}

private double readPercentage(FileSystem.Statistics stats, long fileSize) {
Expand All @@ -423,7 +423,7 @@ public void readEverythingWithFilter() throws IOException {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
assertTrue(p >= 100);
assertTrue(p >= 0.06);
}

@Test
Expand All @@ -440,7 +440,7 @@ public void filterAlternateBatches() throws IOException {
}
FileSystem.Statistics stats = readEnd();
double readPercentage = readPercentage(stats, fs.getFileStatus(filePath).getLen());
assertTrue(readPercentage > 100);
assertTrue(readPercentage > 0.06);
assertTrue(RowCount > rowCount);
}

Expand Down Expand Up @@ -492,7 +492,7 @@ public void filterWithSeek() throws IOException {
}
FileSystem.Statistics stats = readEnd();
double readPercentage = readPercentage(stats, fs.getFileStatus(filePath).getLen());
assertTrue(readPercentage > 130);
assertTrue(readPercentage > 0.07);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void readWithSArg() throws IOException, InterruptedException {
double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
fs.getFileStatus(filePath).getLen());
assertEquals(FilterTestUtil.RowCount, rowCount);
assertTrue(p >= 100);
assertTrue(p >= 0.06);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void readWithSArg() throws IOException, InterruptedException {
double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
fs.getFileStatus(filePath).getLen());
assertEquals(FilterTestUtil.RowCount, rowCount);
assertTrue(p >= 100);
assertTrue(p >= 0.06);
}

@Test
Expand Down

0 comments on commit bc046ed

Please sign in to comment.