Skip to content

Commit

Permalink
rework test case for ParquetIO::Stream methods
Browse files Browse the repository at this point in the history
1. move anonymous class into inner one for readbility
2. clarify the test targeting,narrow down to the stream wrapping/unwrapping.
  • Loading branch information
丘志钟(Zhizhong.Q) committed Jan 31, 2025
1 parent efa189b commit 2971888
Showing 1 changed file with 183 additions and 163 deletions.
346 changes: 183 additions & 163 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.iceberg.Files;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
Expand All @@ -65,7 +67,6 @@
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -228,186 +229,205 @@ public void testTwoLevelList() throws IOException {
assertThat(recordRead.get("topbytes")).isEqualTo(expectedBinary);
}

@Test
public void testStreamClosedProperly() throws IOException {
// test for input
{
class TestStreamClosedProperlyStream extends SeekableInputStream
implements DelegatingInputStream {
boolean thisClosed = false;
boolean delegateClosed = false;

{
reset();
}
private static class CloseAwareInputStream extends InputStream
implements Seekable, PositionedReadable {

@Override
public InputStream getDelegate() {
return new FSDataInputStream(
new InputStream() {
@Override
public int read() throws IOException {
return 0;
}

@Override
public void close() throws IOException {
delegateClosed = true;
}
});
}
boolean isClosed = false;

@Override
public long getPos() throws IOException {
return 0;
}
@Override
public void close() throws IOException {
super.close();
this.isClosed = true;
}

@Override
public void seek(long newPos) throws IOException {}
@Override
public int read() throws IOException {
return 0;
}

@Override
public int read() throws IOException {
return 0;
}
@Override
public int read(long l, byte[] bytes, int i, int i1) throws IOException {
return 0;
}

@Override
public void close() throws IOException {
thisClosed = true;
delegateClosed = true;
}
@Override
public void readFully(long l, byte[] bytes, int i, int i1) throws IOException {}

public void reset() {
thisClosed = false;
delegateClosed = false;
}
}
@Override
public void readFully(long l, byte[] bytes) throws IOException {}

try (TestStreamClosedProperlyStream stream = new TestStreamClosedProperlyStream()) {
try (org.apache.parquet.io.SeekableInputStream _unused =
ParquetIO.file(
new InputFile() {
@Override
public long getLength() {
return 0;
}

@Override
public SeekableInputStream newStream() {
stream.reset();
return stream;
}

@Override
public String location() {
return "";
}

@Override
public boolean exists() {
return false;
}
})
.newStream()) {
assertThat(stream.delegateClosed).isFalse();
assertThat(stream.thisClosed).isFalse();
} finally {
assertThat(stream.delegateClosed).isTrue();
assertThat(stream.thisClosed).isTrue();
}
}
@Override
public void seek(long l) throws IOException {}

@Override
public long getPos() throws IOException {
return 0;
}

// test for output
{
class TestStreamClosedProperlyStream extends PositionOutputStream
implements DelegatingOutputStream {
boolean thisClosed = false;
boolean delegateClosed = false;
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
}

{
reset();
}
private static class CloseAwareFSDataInputStream extends FSDataInputStream {

@Override
public long getPos() throws IOException {
return 0;
}
boolean isClosed = false;

@Override
public void write(int b) throws IOException {}

@Override
public OutputStream getDelegate() {
try {
return new FSDataOutputStream(
new OutputStream() {
@Override
public void write(int b) {}

@Override
public void close() throws IOException {
delegateClosed = true;
}
},
null);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public CloseAwareFSDataInputStream(CloseAwareInputStream in) {
super(in);
}

@Override
public void close() throws IOException {
thisClosed = true;
delegateClosed = true;
}
@Override
public void close() throws IOException {
super.close();
this.isClosed = true;
}
}

private static class CloseAwareDelegatingInputStream extends SeekableInputStream
implements DelegatingInputStream {

public void reset() {
thisClosed = false;
delegateClosed = false;
boolean isClosed = false;
private final InputStream delegate;

public CloseAwareDelegatingInputStream(InputStream delegate) {
this.delegate = delegate;
}

@Override
public int read() throws IOException {
return 0;
}

@Override
public void close() throws IOException {
super.close();
this.getDelegate().close();
this.isClosed = true;
}

@Override
public InputStream getDelegate() {
return delegate;
}

@Override
public long getPos() {
return 0;
}

@Override
public void seek(long l) {}
}

@Test
public void testDelegatingInputStreamCloseProperly() throws IOException {
// prepare the underlying stream
try (CloseAwareInputStream underlying = new CloseAwareInputStream()) {
// special case for hadoop stream
try (CloseAwareFSDataInputStream fsInput = new CloseAwareFSDataInputStream(underlying)) {
// then prepare the delegating stream
try (CloseAwareDelegatingInputStream delegating =
new CloseAwareDelegatingInputStream(fsInput)) {
// ok, call the testing target, ensure no leek.
try (org.apache.parquet.io.SeekableInputStream _unused = ParquetIO.stream(delegating)) {
assertThat(underlying.isClosed).isFalse();
assertThat(fsInput.isClosed).isFalse();
assertThat(delegating.isClosed).isFalse();
} finally {
// a try-catch-finally for `_unused` stream.
// implies all stream crated before should be closed without leaking behavior.
assertThat(delegating.isClosed).isTrue();
assertThat(fsInput.isClosed).isTrue();
assertThat(underlying.isClosed).isTrue();
}
}
}
}
}

try (TestStreamClosedProperlyStream stream = new TestStreamClosedProperlyStream()) {
OutputFile file =
ParquetIO.file(
new org.apache.iceberg.io.OutputFile() {
@Override
public PositionOutputStream create() {
stream.reset();
return stream;
}

@Override
public PositionOutputStream createOrOverwrite() {
stream.reset();
return stream;
}

@Override
public String location() {
return "";
}

@Override
public InputFile toInputFile() {
return null;
}
});

try (org.apache.parquet.io.PositionOutputStream _unused = file.create(0)) {
assertThat(stream.delegateClosed).isFalse();
assertThat(stream.thisClosed).isFalse();
} finally {
assertThat(stream.delegateClosed).isTrue();
assertThat(stream.thisClosed).isTrue();
}
private static class CloseAwareOutputStream extends OutputStream {

boolean isClosed = false;

@Override
public void write(int b) {}

@Override
public void close() throws IOException {
super.close();
this.isClosed = true;
}
}

private static class CloseAwareFSDataOutputStream extends FSDataOutputStream {

boolean isClosed = false;

public CloseAwareFSDataOutputStream(OutputStream out) throws IOException {
super(out, null);
}

try (org.apache.parquet.io.PositionOutputStream _unused = file.createOrOverwrite(0)) {
assertThat(stream.delegateClosed).isFalse();
assertThat(stream.thisClosed).isFalse();
} finally {
assertThat(stream.delegateClosed).isTrue();
assertThat(stream.thisClosed).isTrue();
@Override
public void close() throws IOException {
super.close();
this.isClosed = true;
}
}

private static class CloseAwareDelegatingOutputStream extends PositionOutputStream
implements DelegatingOutputStream {

boolean isClosed = false;
private final OutputStream delegate;

public CloseAwareDelegatingOutputStream(OutputStream delegate) {
this.delegate = delegate;
}

@Override
public void close() throws IOException {
super.close();
this.getDelegate().close();
this.isClosed = true;
}

@Override
public OutputStream getDelegate() {
return delegate;
}

@Override
public long getPos() {
return 0;
}

@Override
public void write(int b) {}
}

@Test
public void testDelegatingOutputStreamCloseProperly() throws IOException {
// prepare the underlying stream
try (CloseAwareOutputStream underlying = new CloseAwareOutputStream()) {
// special case for hadoop stream
try (CloseAwareFSDataOutputStream fsOutput = new CloseAwareFSDataOutputStream(underlying)) {
// then prepare the delegating stream
try (CloseAwareDelegatingOutputStream delegating =
new CloseAwareDelegatingOutputStream(fsOutput)) {
// ok, call the testing target, ensure no leek.
try (org.apache.parquet.io.PositionOutputStream _unused = ParquetIO.stream(delegating)) {
assertThat(underlying.isClosed).isFalse();
assertThat(fsOutput.isClosed).isFalse();
assertThat(delegating.isClosed).isFalse();
} finally {
// a try-catch-finally for `_unused` stream.
// implies all stream crated before should be closed without leaking behavior.
assertThat(delegating.isClosed).isTrue();
assertThat(fsOutput.isClosed).isTrue();
assertThat(underlying.isClosed).isTrue();
}
}
}
}
Expand Down

0 comments on commit 2971888

Please sign in to comment.