Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
  • Loading branch information
zizon authored and 丘志钟(Zhizhong.Q) committed Jan 27, 2025
1 parent 3b2fbb0 commit efa189b
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,10 @@
package org.apache.iceberg.parquet;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.DelegatingInputStream;
import org.apache.iceberg.io.DelegatingOutputStream;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.InputFile;
Expand Down
157 changes: 83 additions & 74 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,29 +229,32 @@ public void testTwoLevelList() throws IOException {
}

@Test
public void testStreamClosedProperly() throws IOException{
public void testStreamClosedProperly() throws IOException {
// test for input
{
class TestStreamClosedProperlyStream extends SeekableInputStream implements DelegatingInputStream {
class TestStreamClosedProperlyStream extends SeekableInputStream
implements DelegatingInputStream {
boolean thisClosed = false;
boolean delegateClosed = false;

{
reset();
}

@Override
public InputStream getDelegate() {
return new FSDataInputStream(new InputStream() {
@Override
public int read() throws IOException {
return 0;
}

@Override
public void close() throws IOException {
delegateClosed = true;
}
});
return new FSDataInputStream(
new InputStream() {
@Override
public int read() throws IOException {
return 0;
}

@Override
public void close() throws IOException {
delegateClosed = true;
}
});
}

@Override
Expand All @@ -260,8 +263,7 @@ public long getPos() throws IOException {
}

@Override
public void seek(long newPos) throws IOException {
}
public void seek(long newPos) throws IOException {}

@Override
public int read() throws IOException {
Expand All @@ -274,35 +276,38 @@ public void close() throws IOException {
delegateClosed = true;
}

public void reset(){
public void reset() {
thisClosed = false;
delegateClosed = false;
}
}

try (TestStreamClosedProperlyStream stream = new TestStreamClosedProperlyStream()) {
try (org.apache.parquet.io.SeekableInputStream _unused = ParquetIO.file(new InputFile() {
@Override
public long getLength() {
return 0;
}

@Override
public SeekableInputStream newStream() {
stream.reset();
return stream;
}

@Override
public String location() {
return "";
}

@Override
public boolean exists() {
return false;
}
}).newStream()) {
try (org.apache.parquet.io.SeekableInputStream _unused =
ParquetIO.file(
new InputFile() {
@Override
public long getLength() {
return 0;
}

@Override
public SeekableInputStream newStream() {
stream.reset();
return stream;
}

@Override
public String location() {
return "";
}

@Override
public boolean exists() {
return false;
}
})
.newStream()) {
assertThat(stream.delegateClosed).isFalse();
assertThat(stream.thisClosed).isFalse();
} finally {
Expand All @@ -314,9 +319,11 @@ public boolean exists() {

// test for output
{
class TestStreamClosedProperlyStream extends PositionOutputStream implements DelegatingOutputStream {
class TestStreamClosedProperlyStream extends PositionOutputStream
implements DelegatingOutputStream {
boolean thisClosed = false;
boolean delegateClosed = false;

{
reset();
}
Expand All @@ -327,22 +334,22 @@ public long getPos() throws IOException {
}

@Override
public void write(int b) throws IOException {
}
public void write(int b) throws IOException {}

@Override
public OutputStream getDelegate() {
try {
return new FSDataOutputStream(new OutputStream() {
@Override
public void write(int b) {
}

@Override
public void close() throws IOException {
delegateClosed = true;
}
}, null);
return new FSDataOutputStream(
new OutputStream() {
@Override
public void write(int b) {}

@Override
public void close() throws IOException {
delegateClosed = true;
}
},
null);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -361,29 +368,31 @@ public void reset() {
}

try (TestStreamClosedProperlyStream stream = new TestStreamClosedProperlyStream()) {
OutputFile file = ParquetIO.file(new org.apache.iceberg.io.OutputFile() {
@Override
public PositionOutputStream create() {
stream.reset();
return stream;
}

@Override
public PositionOutputStream createOrOverwrite() {
stream.reset();
return stream;
}

@Override
public String location() {
return "";
}

@Override
public InputFile toInputFile() {
return null;
}
});
OutputFile file =
ParquetIO.file(
new org.apache.iceberg.io.OutputFile() {
@Override
public PositionOutputStream create() {
stream.reset();
return stream;
}

@Override
public PositionOutputStream createOrOverwrite() {
stream.reset();
return stream;
}

@Override
public String location() {
return "";
}

@Override
public InputFile toInputFile() {
return null;
}
});

try (org.apache.parquet.io.PositionOutputStream _unused = file.create(0)) {
assertThat(stream.delegateClosed).isFalse();
Expand Down

0 comments on commit efa189b

Please sign in to comment.