Skip to content

Commit

Permalink
MongoDB compatible values fixes neo4j-contrib#520
Browse files Browse the repository at this point in the history
  • Loading branch information
albertodelazzari committed Aug 2, 2017
1 parent 126df2f commit 1067f67
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 61 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ dependencies {
compileOnly(group: 'org.neo4j', name: 'neo4j-enterprise', version:neo4jVersion)
compileOnly(group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version:'1.9.7')
testCompile(group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version:'1.9.7')
compileOnly(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.0')
testCompile(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.0')
compileOnly(group: 'org.ow2.asm', name: 'asm', version:'5.0.2')
compile group: 'com.github.javafaker', name: 'javafaker', version:'0.10'

Expand Down
27 changes: 20 additions & 7 deletions docs/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -323,35 +323,48 @@ config contains any of: `{skip:1,limit:5,header:false,sep:'TAB',ignore:['tmp'],a

[cols="3m,2"]
|===
| CALL apoc.mongodb.get(host-or-port,db-or-null,collection-or-null,query-or-null) yield value | perform a find operation on mongodb collection
| CALL apoc.mongodb.get(host-or-port,db-or-null,collection-or-null,query-or-null,[compatibleValues=true|false]) yield value | perform a find operation on mongodb collection
| CALL apoc.mongodb.count(host-or-port,db-or-null,collection-or-null,query-or-null) yield value | perform a find operation on mongodb collection
| CALL apoc.mongodb.first(host-or-port,db-or-null,collection-or-null,query-or-null) yield value | perform a first operation on mongodb collection
| CALL apoc.mongodb.find(host-or-port,db-or-null,collection-or-null,query-or-null,projection-or-null,sort-or-null) yield value | perform a find,project,sort operation on mongodb collection
| CALL apoc.mongodb.first(host-or-port,db-or-null,collection-or-null,query-or-null,[compatibleValues=true|false]) yield value | perform a first operation on mongodb collection
| CALL apoc.mongodb.find(host-or-port,db-or-null,collection-or-null,query-or-null,projection-or-null,sort-or-null,[compatibleValues=true|false]) yield value | perform a find,project,sort operation on mongodb collection
| CALL apoc.mongodb.insert(host-or-port,db-or-null,collection-or-null,list-of-maps) | inserts the given documents into the mongodb collection
| CALL apoc.mongodb.delete(host-or-port,db-or-null,collection-or-null,list-of-maps) | inserts the given documents into the mongodb collection
| CALL apoc.mongodb.update(host-or-port,db-or-null,collection-or-null,list-of-maps) | inserts the given documents into the mongodb collection
|===

If your documents have date fields or any other type that can be automatically converted by Neo4j, you need to set *compatibleValues* to true. These values will be converted according to Jackson databind default mapping.

Copy these jars into the plugins directory:

* bson-3.4.2.jar
* mongo-java-driver-3.4.2.jar
* mongodb-driver-3.4.2.jar
* mongodb-driver-core-3.4.2.jar
* jackson-annotations-2.9.0.jar
* jackson-core-2.9.0.jar
* jackson-databind-2.9.0.jar

You should be able to get them from https://mongodb.github.io/mongo-java-driver/[here] and https://mvnrepository.com/artifact/org.mongodb/bson/3.4.2[here (BSON)] (via Download)
You should be able to get them from https://mongodb.github.io/mongo-java-driver/[here], https://mvnrepository.com/artifact/com.fasterxml.jackson.core[here (fasterxml-jackson)] and https://mvnrepository.com/artifact/org.mongodb/bson/3.4.2[here (BSON)] (via Download)

Or you get them locally from your maven build of apoc.
Or you get them locally from your gradle build of apoc.

----
mvn dependency:copy-dependencies
cp target/dependency/mongodb*.jar target/dependency/bson*.jar $NEO4J_HOME/plugins/
gradle copyRuntimeLibs
cp lib/mongodb*.jar lib/bson*.jar lib/jackson-*-2.9.0.jar $NEO4J_HOME/plugins/
----

[source,cypher]
----
CALL apoc.mongodb.first('mongodb://localhost:27017','test','test',{name:'testDocument'})
----

If you need automatic conversion of *unpackable* values then the cypher query will be:

[source,cypher]
----
CALL apoc.mongodb.first('mongodb://localhost:27017','test','test',{name:'testDocument'},true)
----

// end::mongodb[]

== Interacting with Couchbase
Expand Down
66 changes: 37 additions & 29 deletions src/main/java/apoc/mongodb/MongoDB.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package apoc.mongodb;

import apoc.util.MissingDependencyException;
import org.neo4j.procedure.Description;
import apoc.result.LongResult;
import apoc.result.MapResult;
import apoc.util.MissingDependencyException;
import apoc.util.UrlResolver;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

Expand Down Expand Up @@ -45,14 +45,14 @@
public class MongoDB {

@Procedure
@Description("apoc.mongodb.get(host-or-port,db-or-null,collection-or-null,query-or-null) yield value - perform a find operation on mongodb collection")
public Stream<MapResult> get(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String,Object> query) {
return getMongoColl(hostOrKey, db, collection).all(query).map(MapResult::new);
@Description("apoc.mongodb.get(host-or-port,db-or-null,collection-or-null,query-or-null,[compatibleValues=true|false]) yield value - perform a find operation on mongodb collection")
public Stream<MapResult> get(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String, Object> query, @Name(value = "compatibleValues", defaultValue = "false") boolean compatibleValues) {
return getMongoColl(hostOrKey, db, collection).all(query, compatibleValues).map(MapResult::new);
}

@Procedure
@Description("apoc.mongodb.count(host-or-port,db-or-null,collection-or-null,query-or-null) yield value - perform a find operation on mongodb collection")
public Stream<LongResult> count(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String,Object> query) {
public Stream<LongResult> count(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String, Object> query) {
long count = getMongoColl(hostOrKey, db, collection).count(query);
return Stream.of(new LongResult(count));
}
Expand All @@ -63,46 +63,46 @@ private Coll getColl(@Name("host") String hostOrKey, @Name("db") String db, @Nam
}

@Procedure
@Description("apoc.mongodb.first(host-or-port,db-or-null,collection-or-null,query-or-null) yield value - perform a first operation on mongodb collection")
public Stream<MapResult> first(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String,Object> query) {
Map<String, Object> result = getMongoColl(hostOrKey, db, collection).first(query);
@Description("apoc.mongodb.first(host-or-port,db-or-null,collection-or-null,query-or-null,[compatibleValues=true|false]) yield value - perform a first operation on mongodb collection")
public Stream<MapResult> first(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String, Object> query, @Name(value = "compatibleValues", defaultValue = "false") boolean compatibleValues) {
Map<String, Object> result = getMongoColl(hostOrKey, db, collection).first(query, compatibleValues);
return Stream.of(new MapResult(result));
}

@Procedure
@Description("apoc.mongodb.find(host-or-port,db-or-null,collection-or-null,query-or-null,projection-or-null,sort-or-null) yield value - perform a find,project,sort operation on mongodb collection")
public Stream<MapResult> find(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String,Object> query,@Name("project") Map<String,Object> project,@Name("sort") Map<String,Object> sort) {
return getMongoColl(hostOrKey, db, collection).find(query,project,sort).map(MapResult::new);
@Description("apoc.mongodb.find(host-or-port,db-or-null,collection-or-null,query-or-null,projection-or-null,sort-or-null,[compatibleValues=true|false]) yield value - perform a find,project,sort operation on mongodb collection")
public Stream<MapResult> find(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String, Object> query, @Name("project") Map<String, Object> project, @Name("sort") Map<String, Object> sort, @Name(value = "compatibleValues", defaultValue = "false") boolean compatibleValues) {
return getMongoColl(hostOrKey, db, collection).find(query, project, sort, compatibleValues).map(MapResult::new);
}

@Procedure
@Description("apoc.mongodb.insert(host-or-port,db-or-null,collection-or-null,list-of-maps) - inserts the given documents into the mongodb collection")
public void insert(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("documents") List<Map<String,Object>> documents) {
public void insert(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("documents") List<Map<String, Object>> documents) {
getMongoColl(hostOrKey, db, collection).insert(documents);
}

@Procedure
@Description("apoc.mongodb.delete(host-or-port,db-or-null,collection-or-null,list-of-maps) - inserts the given documents into the mongodb collection")
public Stream<LongResult> delete(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String,Object> query) {
public Stream<LongResult> delete(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String, Object> query) {
return Stream.of(new LongResult(getMongoColl(hostOrKey, db, collection).delete(query)));
}

@Procedure
@Description("apoc.mongodb.update(host-or-port,db-or-null,collection-or-null,list-of-maps) - inserts the given documents into the mongodb collection")
public Stream<LongResult> update(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String,Object> query, @Name("update") Map<String,Object> update) {
return Stream.of(new LongResult(getMongoColl(hostOrKey, db, collection).update(query,update)));
public Stream<LongResult> update(@Name("host") String hostOrKey, @Name("db") String db, @Name("collection") String collection, @Name("query") Map<String, Object> query, @Name("update") Map<String, Object> update) {
return Stream.of(new LongResult(getMongoColl(hostOrKey, db, collection).update(query, update)));
}

private String getMongoDBUrl(String hostOrKey) {
return new UrlResolver("mongodb", "localhost", 27017).getUrl("mongodb", hostOrKey);
}

private Coll getMongoColl(String hostOrKey, String db, String collection){
private Coll getMongoColl(String hostOrKey, String db, String collection) {
Coll coll = null;
try {
coll = getColl(hostOrKey, db, collection);
}
catch (NoClassDefFoundError e) {
throw new MissingDependencyException("Cannot find the jar into the plugins folder. \n"+
} catch (NoClassDefFoundError e) {
throw new MissingDependencyException("Cannot find the jar into the plugins folder. \n" +
"Please put these jar in the plugins folder :\n\n" +
"bson-x.y.z.jar\n" +
"\n" +
Expand All @@ -112,26 +112,34 @@ private Coll getMongoColl(String hostOrKey, String db, String collection){
"\n" +
"mongodb-driver-core-x.y.z.jar\n" +
"\n" +
"See the documentation: https://neo4j-contrib.github.io/neo4j-apoc-procedures/#_interacting_with_mongodb");
"jackson-annotations-x.y.z.jar\n\njackson-core-x.y.z.jar\n\njackson-databind-x.y.z.jar\n\nSee the documentation: https://neo4j-contrib.github.io/neo4j-apoc-procedures/#_interacting_with_mongodb");
}
return coll;
}

interface Coll extends Closeable {
Map<String,Object> first(Map<String,Object> params);
Stream<Map<String,Object>> all(Map<String,Object> query);
Map<String, Object> first(Map<String, Object> params, boolean compatibleValues);

Stream<Map<String, Object>> all(Map<String, Object> query, boolean compatibleValues);

long count(Map<String, Object> query);
Stream<Map<String,Object>> find(Map<String,Object> query, Map<String,Object> project, Map<String,Object> sort);
void insert(List<Map<String,Object>> docs);
long update(Map<String,Object> query, Map<String,Object> update);
long delete(Map<String,Object> query);

Stream<Map<String, Object>> find(Map<String, Object> query, Map<String, Object> project, Map<String, Object> sort, boolean compatibleValues);

void insert(List<Map<String, Object>> docs);

long update(Map<String, Object> query, Map<String, Object> update);

long delete(Map<String, Object> query);

class Factory {
public static Coll create(String url, String db, String coll) {
try {
return (Coll)Class.forName("apoc.mongodb.MongoDBColl").getConstructor(String.class,String.class,String.class).newInstance(url,db,coll);
System.out.println(String.format("%s %s %s", url, db, coll));
return (Coll) Class.forName("apoc.mongodb.MongoDBColl").getConstructor(String.class, String.class, String.class).newInstance(url, db, coll);
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException | ClassNotFoundException e) {
throw new RuntimeException("Could not create MongoDBClientWrapper instance",e);
e.printStackTrace();
throw new RuntimeException("Could not create MongoDBClientWrapper instance", e);
}
}
}
Expand Down
32 changes: 23 additions & 9 deletions src/main/java/apoc/mongodb/MongoDBColl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package apoc.mongodb;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
Expand All @@ -24,6 +27,7 @@
*/
class MongoDBColl implements MongoDB.Coll {

private final ObjectMapper jsonMapper = new ObjectMapper().enable(DeserializationFeature.USE_LONG_FOR_INTS);
private MongoCollection<Document> collection;
private MongoClient mongoClient;

Expand All @@ -44,9 +48,19 @@ public void close() throws IOException {
* but a simple String representation of it
*
* @param document
* @param compatibleValues if true we convert the document to JSON and than back to a Map
* @return
*/
private Map<String, Object> documentToPackableMap(Map<String, Object> document) {
private Map<String, Object> documentToPackableMap(Map<String, Object> document, boolean compatibleValues) {
if (compatibleValues) {
try {
return jsonMapper.readValue(jsonMapper.writeValueAsBytes(document), new TypeReference<Map<String, Object>>() {
});
} catch (Exception e) {
throw new RuntimeException("Cannot convert document to json and back to Map " + e.getMessage());
}
}

/**
* A document in MongoDB has a special field "_id" of type ObjectId
* This object is not "packable" by Neo4jPacker so it must be converted to a value that Neo4j can deal with
Expand All @@ -66,32 +80,32 @@ private Map<String, Object> documentToPackableMap(Map<String, Object> document)
}

@Override
public Map<String, Object> first(Map<String, Object> query) {
return documentToPackableMap(collection.find(new Document(query)).first());
public Map<String, Object> first(Map<String, Object> query, boolean compatibleValues) {
return documentToPackableMap(collection.find(new Document(query)).first(), compatibleValues);
}

@Override
public Stream<Map<String, Object>> all(Map<String, Object> query) {
return asStream(query == null ? collection.find() : collection.find(new Document(query)));
public Stream<Map<String, Object>> all(Map<String, Object> query, boolean compatibleValues) {
return asStream(query == null ? collection.find() : collection.find(new Document(query)), compatibleValues);
}

@Override
public long count(Map<String, Object> query) {
return query == null ? collection.count() : collection.count(new Document(query));
}

private Stream<Map<String, Object>> asStream(FindIterable<Document> result) {
private Stream<Map<String, Object>> asStream(FindIterable<Document> result, boolean compatibleValues) {
MongoCursor<Document> iterator = result.iterator();
Spliterator<Map<String, Object>> spliterator = Spliterators.spliterator(iterator, -1, Spliterator.ORDERED);
return StreamSupport.stream(spliterator, false).map(doc -> this.documentToPackableMap(doc)).onClose(iterator::close);
return StreamSupport.stream(spliterator, false).map(doc -> this.documentToPackableMap(doc, compatibleValues)).onClose(iterator::close);
}

@Override
public Stream<Map<String, Object>> find(Map<String, Object> query, Map<String, Object> project, Map<String, Object> sort) {
public Stream<Map<String, Object>> find(Map<String, Object> query, Map<String, Object> project, Map<String, Object> sort, boolean compatibleValues) {
FindIterable<Document> documents = query == null ? collection.find() : collection.find(new Document(query));
if (project != null) documents = documents.projection(new Document(project));
if (sort != null) documents = documents.sort(new Document(sort));
return asStream(documents);
return asStream(documents, compatibleValues);
}

@Override
Expand Down
Loading

0 comments on commit 1067f67

Please sign in to comment.