Skip to content

Commit

Permalink
Fixes #3823: Reading Parquet from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Dec 1, 2023
1 parent 4430171 commit 1e669bc
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 108 deletions.
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ allprojects {
}

apply plugin: 'java-library'
if (System.env.CI != null)
apply from: 'teamcity-repository.gradle'

repositories {

Expand Down Expand Up @@ -66,7 +64,7 @@ subprojects {
archiveClassifier = 'javadoc'
}
test {
//exclude '**/CypherProceduresClusterTest.class'//, '**/AtomicTest.class'
include '**/ParquetS3Test.class', '**/ParquetTest.class'

// neo4jDockerImage system property is used in TestContainerUtil
systemProperties 'user.language' : 'en' ,
Expand Down
7 changes: 7 additions & 0 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ dependencies {
compileOnly group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers
// testImplementation analogous is not needed since is bundled via `test-utils` submodule
compileOnly group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.1.0', withoutServers

compileOnly group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.5', {
exclude group: 'com.amazonaws'
}
testImplementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.5', {
exclude group: 'com.amazonaws'
}


// These dependencies affect the tests only, they will not be packaged in the resulting .jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import apoc.Pools;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.FileUtils;
import apoc.util.QueueBasedSpliterator;
import apoc.util.QueueUtil;
import apoc.util.Util;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.schema.MessageType;
Expand All @@ -26,7 +26,7 @@

public abstract class ExportParquetFileStrategy<TYPE, IN> implements ExportParquetStrategy<IN, Stream<ProgressInfo>> {

private final String fileName;
private String fileName;
private final GraphDatabaseService db;
private final Pools pools;
private final TerminationGuard terminationGuard;
Expand All @@ -49,12 +49,12 @@ public Stream<ProgressInfo> export(IN data, ParquetConfig config) {
progressInfo.batchSize = config.getBatchSize();
ProgressReporter reporter = new ProgressReporter(null, null, progressInfo);

Path fileToWrite = new Path(fileName);
final BlockingQueue<ProgressInfo> queue = new ArrayBlockingQueue<>(10);
Util.inTxFuture(pools.getDefaultExecutorService(), db, tx -> {
int batchCount = 0;
List<TYPE> rows = new ArrayList<>(config.getBatchSize());
ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(fileToWrite);
ParquetBufferedWriter parquetBufferedWriter = new ParquetBufferedWriter(FileUtils.getOutputStream(fileName));
ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(parquetBufferedWriter);

try {
Iterator<TYPE> it = toIterator(reporter, data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.io.IOException;
import java.util.List;


public interface ExportParquetStrategy<IN, OUT> {

OUT export(IN data, ParquetConfig config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.schema.MessageType;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.logging.Log;
import org.neo4j.procedure.TerminationGuard;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -26,7 +23,6 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;


public abstract class ExportParquetStreamStrategy<TYPE, IN> implements ExportParquetStrategy<IN, Stream<ByteArrayResult>> {

private final GraphDatabaseService db;
Expand Down Expand Up @@ -96,61 +92,4 @@ private byte[] writeBatch(List<TYPE> rows, IN data, ParquetConfig config) {

public abstract Iterator<TYPE> toIterator(IN data);

// create OutputFile
private record ParquetBufferedWriter(OutputStream out) implements OutputFile {

@Override
public PositionOutputStream create(long blockSizeHint) {
return createPositionOutputstream();
}

@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
return createPositionOutputstream();
}

private PositionOutputStream createPositionOutputstream() {
return new PositionOutputStream() {

int pos = 0;

@Override
public long getPos() throws IOException {
return pos;
}

@Override
public void flush() throws IOException {
out.flush();
}

@Override
public void close() throws IOException {
out.close();
}

@Override
public void write(int b) throws IOException {
out.write(b);
pos++;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
pos += len;
}
};
}

@Override
public boolean supportsBlockSize() {
return false;
}

@Override
public long defaultBlockSize() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package apoc.export.parquet;

import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;

import java.io.IOException;
import java.io.OutputStream;

public record ParquetBufferedWriter(OutputStream out) implements OutputFile {

@Override
public PositionOutputStream create(long blockSizeHint) {
return createPositionOutputstream();
}

@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) {
return createPositionOutputstream();
}

private PositionOutputStream createPositionOutputstream() {
return new PositionOutputStream() {

int pos = 0;

@Override
public long getPos() {
return pos;
}

@Override
public void flush() throws IOException {
out.flush();
}

@Override
public void close() throws IOException {
out.close();
}

@Override
public void write(int b) throws IOException {
out.write(b);
pos++;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
pos += len;
}
};
}

@Override
public boolean supportsBlockSize() {
return false;
}

@Override
public long defaultBlockSize() {
return 0;
}
}
19 changes: 4 additions & 15 deletions extended/src/main/java/apoc/export/parquet/ParquetReadUtil.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package apoc.export.parquet;

import apoc.ApocConfig;
import apoc.util.CompressionAlgo;
import apoc.util.FileUtils;
import apoc.util.JsonUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
Expand Down Expand Up @@ -33,7 +31,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static apoc.util.FileUtils.changeFileUrlIfImportDirectoryConstrained;

public class ParquetReadUtil {

Expand Down Expand Up @@ -168,19 +165,11 @@ public static java.util.concurrent.TimeUnit toTimeUnitJava(LogicalTypeAnnotation
};
}


public static InputFile getInputFile(Object source) throws IOException {
if (source instanceof String) {
ApocConfig.apocConfig().isImportFileEnabled();
String fileName = changeFileUrlIfImportDirectoryConstrained((String) source);
Path file = new Path(fileName);
return HadoopInputFile.fromPath(file, new Configuration());
}
return new ParquetStream((byte[]) source);
return new ParquetStream(FileUtils.inputStreamFor(source, null, null, CompressionAlgo.NONE.name()).readAllBytes());
}

public static ApocParquetReader getReader(Object source, ParquetConfig conf) {

try {
return new ApocParquetReader(getInputFile(source), conf);
} catch (IOException e) {
Expand Down Expand Up @@ -229,4 +218,4 @@ public long getPos() {
};
}
}
}
}
Loading

0 comments on commit 1e669bc

Please sign in to comment.