Skip to content

Commit

Permalink
Add stored procedure loading parquet.
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-simons committed Jun 28, 2023
1 parent 04c048a commit 0d87e79
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 7 deletions.
33 changes: 31 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,43 @@ Ignore the tons of warnings, it's an experiment
mvn -Dfast clean package
```

## Server side batches (default)
### Running

#### Server side batches (default)

```bash
java -jar target/neo4j-load-parquet-1.0-SNAPSHOT.jar -abolt://localhost:7687 -uneo4j -pverysecret --label=Test ~/tmp/yellow_tripdata_2023-04.parquet
```

## Client side batches (slow and crappy)
#### Client side batches (slow and crappy)

**NOTE**: Client side batches are implemented as a number of statements per transaction. The code is using explicit transactions, using a parameterized query object and relies on the combined driver and server infrastructure to tune this. Right now, buffering statements and then using a halfway decent sized transaction is the only way to do client side batching with the Neo4j Java Driver. If you one needs more, it's through Cypher.

```bash
java -jar target/neo4j-load-parquet-1.0-SNAPSHOT.jar -abolt://localhost:7687 -uneo4j -pverysecret --mode=CLIENT_SIDE_BATCHING --batch-size=100 --label=Test2 ~/tmp/yellow_tripdata_2023-04.parquet
```

#### As stored procedure

This PoC can also be dropped into the `plugin` folder.
After a server restart a procedure `loadParquet` is available.
This procedure takes one argument that should look like a URI and tries to resolve that to a Parquet file.
The URI can be a local file with or without `file://` scheme or any - by Java - supported URL. Credentials, headers etc. are not yet supported.
The stored procedure returns a map per row in the parquet file.

```cypher
CALL loadParquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet')
YIELD row
RETURN count(row) as numRows;
```

Or ingest all data for further processing (Note: Downloaded Parquet files are not cached and dl will add to the processing time. Use local files for benchmarking.

```cypher
CALL loadParquet('/Users/msimons/tmp/yellow_tripdata_2023-04.parquet')
YIELD row
CALL {
WITH row
CREATE (r:Ride) SET r = row
} IN TRANSACTIONS of 100000 rows;
```
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<maven.compiler.release>${java.version}</maven.compiler.release>
<maven.version>3.8.7</maven.version>
<neo4j-java-driver.version>5.9.0</neo4j-java-driver.version>
<neo4j.version>5.9.0</neo4j.version>
<parquet-floor.version>1.38</parquet-floor.version>
<picocli.version>4.7.0</picocli.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -46,6 +47,12 @@
<artifactId>slf4j-simple</artifactId>
<version>${slf4j-simple.version}</version>
</dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j</artifactId>
<version>${neo4j.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/neo4j/parquet/loader/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Values;

import blue.strategic.parquet.ParquetReader;
import picocli.CommandLine;
Expand Down Expand Up @@ -103,7 +104,7 @@ public Integer call() throws Exception {
CALL {
WITH p DETACH DELETE p
} IN TRANSACTIONS OF $rows ROWS""", Map.of("label", label, "rows", batchSize)).consume();
var data = ParquetReader.streamContent(file, PropertiesHydrator::new);
var data = ParquetReader.streamContent(file, listOfColumns -> new PropertiesHydrator<>(listOfColumns, Values::value));
data.forEach(writer::addNode);
writer.flush();

Expand Down
43 changes: 43 additions & 0 deletions src/main/java/org/neo4j/parquet/loader/LoadParquet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.neo4j.parquet.loader;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.stream.Stream;

import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

import blue.strategic.parquet.ParquetReader;

public final class LoadParquet {

@Procedure(name = "loadParquet")
public Stream<MapResult> impl(@Name("resource") String resource) throws IOException {

var uri = URI.create(resource);
var scheme = uri.getScheme();

File inputFile;
if (scheme == null || "file".equalsIgnoreCase(scheme)) {
inputFile = uri.isAbsolute() ? new File(uri) : new File(uri.getPath());
} else {
// Probably there's a better library than the one I picked that can deal with Parquet on remote hosts
// Making that nice is not part of this PoC
inputFile = File.createTempFile("neo4j-", ".parquet");
try (
var in = uri.toURL().openStream();
var out = new BufferedOutputStream(new FileOutputStream(inputFile))) {
in.transferTo(out);
}
}

return ParquetReader.streamContent(inputFile, listOfColumns -> new PropertiesHydrator<>(listOfColumns, MapResult::new));
}

public record MapResult(Map<String, Object> row) {
}
}
11 changes: 7 additions & 4 deletions src/main/java/org/neo4j/parquet/loader/PropertiesHydrator.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.parquet.column.ColumnDescriptor;
Expand All @@ -15,12 +16,14 @@
/**
* Hydrates Parquet records into a {@link Value} containing a map of properties.
*/
final class PropertiesHydrator implements Hydrator<Object[], Value> {
final class PropertiesHydrator<R> implements Hydrator<Object[], R> {

private final Map<String, Integer> index;
private final Function<Map<String, Object>, R> finisher;

PropertiesHydrator(List<ColumnDescriptor> columns) {
PropertiesHydrator(List<ColumnDescriptor> columns, Function<Map<String, Object>, R> finisher) {
this.index = new HashMap<>(columns.size());
this.finisher = finisher;
int idx = 0;
for (ColumnDescriptor d : columns) {
this.index.put(d.getPath()[d.getPath().length - 1], idx++);
Expand All @@ -39,9 +42,9 @@ public Object[] add(Object[] target, String heading, Object value) {
}

@Override
public Value finish(Object[] target) {
public R finish(Object[] target) {
return this.index.entrySet().stream()
.filter(e -> target[e.getValue()] != null)
.collect(Collectors.collectingAndThen(Collectors.toMap(Map.Entry::getKey, e -> target[e.getValue()]), Values::value));
.collect(Collectors.collectingAndThen(Collectors.toMap(Map.Entry::getKey, e -> target[e.getValue()]), finisher));
}
}

0 comments on commit 0d87e79

Please sign in to comment.