From 3cf59d0f4015ea67175934d7b6a5509d223d497c Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Thu, 16 Jan 2025 14:13:26 +0100 Subject: [PATCH 1/3] [NOID] Fixes #4153: Handling OpenAI 429's gracefully (#4284) (#4329) * Fixes #4153: Handling OpenAI 429's gracefully * cleanup * fix tests --- .../modules/ROOT/pages/ml/openai.adoc | 3 + full/src/main/java/apoc/ml/OpenAI.java | 18 ++- .../src/main/java/apoc/util/ExtendedUtil.java | 58 ++++++++++ .../test/java/apoc/util/ExtendedUtilTest.java | 108 ++++++++++++++++++ 4 files changed, 186 insertions(+), 1 deletion(-) create mode 100644 full/src/main/java/apoc/util/ExtendedUtil.java create mode 100644 full/src/test/java/apoc/util/ExtendedUtilTest.java diff --git a/docs/asciidoc/modules/ROOT/pages/ml/openai.adoc b/docs/asciidoc/modules/ROOT/pages/ml/openai.adoc index f7ff49dd5a..5592ca59f7 100644 --- a/docs/asciidoc/modules/ROOT/pages/ml/openai.adoc +++ b/docs/asciidoc/modules/ROOT/pages/ml/openai.adoc @@ -28,6 +28,9 @@ If present, they take precedence over the analogous APOC configs. | endpoint | analogous to `apoc.ml.openai.url` APOC config | apiVersion | analogous to `apoc.ml.azure.api.version` APOC config | failOnError | If true (default), the procedure fails in case of empty, blank or null input +| enableBackOffRetries | If set to true, enables the backoff retry strategy for handling failures. (default: false) +| backOffRetries | Sets the maximum number of retry attempts before the operation throws an exception. (default: 5) +| exponentialBackoff | If set to true, applies an exponential progression to the wait time between retries. If set to false, the wait time increases linearly. (default: false) |=== diff --git a/full/src/main/java/apoc/ml/OpenAI.java b/full/src/main/java/apoc/ml/OpenAI.java index 8a585cbddc..28f554e940 100644 --- a/full/src/main/java/apoc/ml/OpenAI.java +++ b/full/src/main/java/apoc/ml/OpenAI.java @@ -12,6 +12,7 @@ import apoc.ApocConfig; import apoc.Extended; import apoc.result.MapResult; +import apoc.util.ExtendedUtil; import apoc.util.JsonUtil; import apoc.util.Util; import com.fasterxml.jackson.core.JsonProcessingException; @@ -35,7 +36,11 @@ @Extended public class OpenAI { + public static final String JSON_PATH_CONF_KEY = "jsonPath"; public static final String FAIL_ON_ERROR_CONF = "failOnError"; + public static final String ENABLE_BACK_OFF_RETRIES_CONF_KEY = "enableBackOffRetries"; + public static final String ENABLE_EXPONENTIAL_BACK_OFF_CONF_KEY = "exponentialBackoff"; + public static final String BACK_OFF_RETRIES_CONF_KEY = "backOffRetries"; @Context public ApocConfig apocConfig; @@ -63,6 +68,9 @@ static Stream executeRequest( ApocConfig apocConfig) throws JsonProcessingException, MalformedURLException { apiKey = (String) configuration.getOrDefault(APIKEY_CONF_KEY, apocConfig.getString(APOC_OPENAI_KEY, apiKey)); + boolean enableBackOffRetries = Util.toBoolean(configuration.get(ENABLE_BACK_OFF_RETRIES_CONF_KEY)); + Integer backOffRetries = Util.toInteger(configuration.getOrDefault(BACK_OFF_RETRIES_CONF_KEY, 5)); + boolean exponentialBackoff = Util.toBoolean(configuration.get(ENABLE_EXPONENTIAL_BACK_OFF_CONF_KEY)); if (apiKey == null || apiKey.isBlank()) throw new IllegalArgumentException("API Key must not be empty"); String apiTypeString = (String) configuration.getOrDefault( API_TYPE_CONF_KEY, apocConfig.getString(APOC_ML_OPENAI_TYPE, OpenAIRequestHandler.Type.OPENAI.name())); @@ -83,6 +91,7 @@ static Stream executeRequest( OpenAIRequestHandler apiType = type.get(); final Map headers = new HashMap<>(); + String sJsonPath = (String) configuration.getOrDefault(JSON_PATH_CONF_KEY, jsonPath); headers.put("Content-Type", "application/json"); apiType.addApiKey(headers, apiKey); @@ -93,7 +102,14 @@ static Stream executeRequest( // eg: https://my-resource.openai.azure.com/openai/deployments/apoc-embeddings-model // therefore is better to join the not-empty path pieces var url = apiType.getFullUrl(path, configuration, apocConfig); - return JsonUtil.loadJson(url, headers, payload, jsonPath, true, List.of()); + return ExtendedUtil.withBackOffRetries( + () -> JsonUtil.loadJson(url, headers, payload, sJsonPath, true, List.of()), + enableBackOffRetries, + backOffRetries, + exponentialBackoff, + exception -> { + if (!exception.getMessage().contains("429")) throw new RuntimeException(exception); + }); } @Procedure("apoc.ml.openai.embedding") diff --git a/full/src/main/java/apoc/util/ExtendedUtil.java b/full/src/main/java/apoc/util/ExtendedUtil.java new file mode 100644 index 0000000000..8dba886ace --- /dev/null +++ b/full/src/main/java/apoc/util/ExtendedUtil.java @@ -0,0 +1,58 @@ +package apoc.util; + +import java.time.Duration; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public class ExtendedUtil { + public static T withBackOffRetries( + Supplier func, + boolean retry, + int backoffRetry, + boolean exponential, + Consumer exceptionHandler) { + T result; + backoffRetry = backoffRetry < 1 ? 5 : backoffRetry; + int countDown = backoffRetry; + exceptionHandler = Objects.requireNonNullElse(exceptionHandler, exe -> {}); + while (true) { + try { + result = func.get(); + break; + } catch (Exception e) { + if (!retry || countDown < 1) throw e; + exceptionHandler.accept(e); + countDown--; + long delay = getDelay(backoffRetry, countDown, exponential); + backoffSleep(delay); + } + } + return result; + } + + private static void backoffSleep(long millis) { + sleep(millis, "Operation interrupted during backoff"); + } + + public static void sleep(long millis, String interruptedMessage) { + try { + Thread.sleep(millis); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(interruptedMessage, ie); + } + } + + private static long getDelay(int backoffRetry, int countDown, boolean exponential) { + int backOffTime = backoffRetry - countDown; + long sleepMultiplier = exponential + ? (long) Math.pow(2, backOffTime) + : // Exponential retry progression + backOffTime; // Linear retry progression + return Math.min( + Duration.ofSeconds(1).multipliedBy(sleepMultiplier).toMillis(), + Duration.ofSeconds(30).toMillis() // Max 30s + ); + } +} diff --git a/full/src/test/java/apoc/util/ExtendedUtilTest.java b/full/src/test/java/apoc/util/ExtendedUtilTest.java new file mode 100644 index 0000000000..20790ca38e --- /dev/null +++ b/full/src/test/java/apoc/util/ExtendedUtilTest.java @@ -0,0 +1,108 @@ +package apoc.util; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class ExtendedUtilTest { + + private static int i = 0; + + @Test + public void testWithLinearBackOffRetriesWithSuccess() { + i = 0; + long start = System.currentTimeMillis(); + int result = ExtendedUtil.withBackOffRetries( + this::testFunction, + true, + -1, // test backoffRetry default value -> 5 + false, + runEx -> { + if (!runEx.getMessage().contains("Expected")) throw new RuntimeException("Some Bad News..."); + }); + long time = System.currentTimeMillis() - start; + + assertEquals(4, result); + + // The method will attempt to execute the operation with a linear backoff strategy, + // sleeping for 1 second, 2 seconds, and 3 seconds between retries. + // This results in a total wait time of 6 seconds (1s + 2s + 3s + 4s) if the operation succeeds on the third + // attempt, + // leading to an approximate execution time of 6 seconds. + assertTrue("Current time is: " + time, time > 9000 && time < 11000); + } + + @Test + public void testWithExponentialBackOffRetriesWithSuccess() { + i = 0; + long start = System.currentTimeMillis(); + int result = ExtendedUtil.withBackOffRetries( + this::testFunction, + true, + 0, // test backoffRetry default value -> 5 + true, + runEx -> {}); + long time = System.currentTimeMillis() - start; + + assertEquals(4, result); + + // The method will attempt to execute the operation with an exponential backoff strategy, + // sleeping for 2 second, 4 seconds, and 8 seconds between retries. + // This results in a total wait time of 30 seconds (2s + 4s + 8s + 16s) if the operation succeeds on the third + // attempt, + // leading to an approximate execution time of 14 seconds. + assertTrue("Current time is: " + time, time > 29000 && time < 31000); + } + + @Test + public void testBackOffRetriesWithError() { + i = 0; + long start = System.currentTimeMillis(); + assertThrows( + RuntimeException.class, + () -> ExtendedUtil.withBackOffRetries(this::testFunction, true, 2, false, runEx -> {})); + long time = System.currentTimeMillis() - start; + + // The method is configured to retry the operation twice. + // So, it will make two extra-attempts, waiting for 1 second and 2 seconds before failing and throwing an + // exception. + // Resulting in an approximate execution time of 3 seconds. + assertTrue("Current time is: " + time, time > 2000 && time < 4000); + } + + @Test + public void testBackOffRetriesWithErrorAndExponential() { + i = 0; + long start = System.currentTimeMillis(); + assertThrows( + RuntimeException.class, + () -> ExtendedUtil.withBackOffRetries(this::testFunction, true, 2, true, runEx -> {})); + long time = System.currentTimeMillis() - start; + + // The method is configured to retry the operation twice. + // So, it will make two extra-attempts, waiting for 2 second and 4 seconds before failing and throwing an + // exception. + // Resulting in an approximate execution time of 6 seconds. + assertTrue("Current time is: " + time, time > 5000 && time < 7000); + } + + @Test + public void testWithoutBackOffRetriesWithError() { + i = 0; + assertThrows( + RuntimeException.class, + () -> ExtendedUtil.withBackOffRetries(this::testFunction, false, 30, false, runEx -> {})); + + // Retry strategy is not active and the testFunction is executed only once by raising an exception. + assertEquals(1, i); + } + + private int testFunction() { + if (i == 4) { + return i; + } + i++; + throw new RuntimeException("Expected i not equal to 4"); + } +} From 6ebd0ac8f05707738133fca3cfd58f2b270ec086 Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Mon, 20 Jan 2025 16:53:00 +0100 Subject: [PATCH 2/3] [NOID] Fixes #4246: Add apoc.node.match and apoc.relationship.match procedures (#4277) (#4330) * Fixes #4246: Add apoc.node.match and apoc.relationship.match procedures * fix tests * fix tests * changed procedure naming * fix apache commons error and added docs with load json --- docs/asciidoc/modules/ROOT/nav.adoc | 1 + .../modules/ROOT/pages/misc/index.adoc | 1 + .../ROOT/pages/misc/match-entities.adoc | 366 ++++++++++++++++++ .../java/apoc/entities/EntitiesExtended.java | 131 +++++++ .../main/java/apoc/nodes/NodesExtended.java | 57 --- .../src/main/java/apoc/util/ExtendedUtil.java | 9 + full/src/main/resources/extended.txt | 2 + .../apoc/entities/EntitiesExtendedTest.java | 355 +++++++++++++++++ .../java/apoc/nodes/NodesExtendedTest.java | 120 ------ .../apoc/periodic/PeriodicExtendedTest.java | 4 +- 10 files changed, 867 insertions(+), 179 deletions(-) create mode 100644 docs/asciidoc/modules/ROOT/pages/misc/match-entities.adoc create mode 100644 full/src/main/java/apoc/entities/EntitiesExtended.java delete mode 100644 full/src/main/java/apoc/nodes/NodesExtended.java create mode 100644 full/src/test/java/apoc/entities/EntitiesExtendedTest.java delete mode 100644 full/src/test/java/apoc/nodes/NodesExtendedTest.java diff --git a/docs/asciidoc/modules/ROOT/nav.adoc b/docs/asciidoc/modules/ROOT/nav.adoc index 6128a5ddf5..479f13bec5 100644 --- a/docs/asciidoc/modules/ROOT/nav.adoc +++ b/docs/asciidoc/modules/ROOT/nav.adoc @@ -162,6 +162,7 @@ include::partial$generated-documentation/nav.adoc[] ** xref::misc/spatial.adoc[] ** xref::misc/static-values.adoc[] ** xref::misc/utility-functions.adoc[] + ** xref::misc/match-entities.adoc[] * xref:indexes/index.adoc[] ** xref::indexes/schema-index-operations.adoc[] diff --git a/docs/asciidoc/modules/ROOT/pages/misc/index.adoc b/docs/asciidoc/modules/ROOT/pages/misc/index.adoc index 72f191efac..9d560a7cca 100644 --- a/docs/asciidoc/modules/ROOT/pages/misc/index.adoc +++ b/docs/asciidoc/modules/ROOT/pages/misc/index.adoc @@ -10,6 +10,7 @@ Cypher brings along some basic functions for math, text, collections and maps. * xref::misc/spatial.adoc[] * xref::misc/static-values.adoc[] * xref::misc/utility-functions.adoc[] +* xref::misc/match-entities.adoc[] diff --git a/docs/asciidoc/modules/ROOT/pages/misc/match-entities.adoc b/docs/asciidoc/modules/ROOT/pages/misc/match-entities.adoc new file mode 100644 index 0000000000..9af058289c --- /dev/null +++ b/docs/asciidoc/modules/ROOT/pages/misc/match-entities.adoc @@ -0,0 +1,366 @@ +[[match-entities]] += Match entities +:description: This section describes procedures and functions for matching entities. + +The library provides 2 procedure for matching entities: + +- apoc.node.match +- apoc.rel.match + +[[matching-node]] +== Matching nodes + +[.emphasis] +"apoc.node.match(['Label'], identProps:{key:value, ...}, onMatchProps:{key:value,...})" - match nodes with dynamic labels, with support for setting properties on matched nodes + +=== Signature + +[source] +---- +apoc.node.match(label :: LIST? OF STRING?, identProps :: MAP?, onMatchProps = {} :: MAP?) :: (node :: NODE?) +---- + +=== Input parameters +[.procedures, opts=header] +|=== +| Name | Type | Default | Description +| labels | LIST? OF STRING? | null | The list of labels used for the generated MATCH statement. Passing `null` or an empty list will match a node without any constraints on labels. `null` or empty strings within the list are not supported. +| identProps | MAP? | null | Properties that are used for MATCH statement. +| onMatchProps | MAP? | {} | Properties that are set when a node is matched. +|=== + +=== Output parameters +[.procedures, opts=header] +|=== +| Name | Type +|node|NODE? +|=== + +This procedure provides a more flexible and performant way of matching nodes than Cypher's https://neo4j.com/docs/cypher-manual/current/clauses/match/[`MATCH`^] clause. + +=== Usage Examples +The example below shows equivalent ways of matching a node with the `Person` label, with a `name` property of "Billy Reviewer": + +// tag::tabs[] +[.tabs] + +.apoc.node.match +[source,cypher] +---- +CALL apoc.node.match( + ["Person"], + {name: "Billy Reviewer"}, + {lastSeen: datetime()} +) +YIELD node +RETURN node; +---- + +.MATCH clause +[source,cypher] +---- +MATCH (node:Person {name: "Billy Reviewer"}) +SET node.lastSeen = datetime() +RETURN node; +---- +// end::tabs[] + +.Results +[opts="header"] +|=== +| node +| (:Person {name: "Billy Reviewer", lastSeen: 2020-11-24T11:33:39.319Z}) +|=== + +But this procedure is mostly useful for matching nodes that have dynamic labels or properties. +For example, we might want to create a node with labels or properties passed in as parameters. + +The following creates `labels` and `properties` parameters: + +[source,cypher] +---- +:param labels => (["Person"]); +:param identityProperties => ({name: "Billy Reviewer"}); +:param onMatchProperties => ({placeOfBirth: "Stars of the milky way, Always at the time of sunrise."}); +---- + +The following match a node with labels and properties based on `labels` and `identityProperties`, furthermore sets a new property based on `onMatchProperties`: + +[source,cypher] +---- +CALL apoc.node.match($labels, $identityProperties, $onMatchProperties) +YIELD node +RETURN node; +---- + +.Results +[opts="header"] +|=== +| node +| (:Person {name: "Billy Reviewer", lastSeen: 2020-11-24T11:33:39.319Z, placeOfBirth: "Stars of the milky way, Always at the time of sunrise."}) +|=== + + +In addition, we can use the `apoc.node.match` along with the `apoc.load.json` to dynamically set nodes starting from a JSON. + +For example, given the following dataset: + +[source,cypher] +---- +CREATE (giacomo:Person:Actor {name: 'Giacomino Poretti'}), + (aldo:Person:Actor {name: 'Cataldo Baglio'}), + (giovanni:Person:Actor {name: 'Giovanni Storti'}) +---- + +and the following `all.json` file: + +[source,json] +---- +[ + { + "labels":[ + "Person", + "Actor" + ], + "matchProps":{ + "name":"Giacomino Poretti" + }, + "setProps":{ + "bio":"Giacomo Poretti was born on April 26 1956 in Busto Garolfo", + "Alias":"Tafazzi" + } + }, + { + "labels":[ + "Person", + "Actor" + ], + "matchProps":{ + "name":"Giovanni Storti" + }, + "setProps":{ + "bio":"Giovanni Storti was born ...", + "Alias":"Rezzonico" + } + }, + { + "labels":[ + "Person", + "Actor" + ], + "matchProps":{ + "name":"Cataldo Baglio" + }, + "setProps":{ + "bio":"Cataldo Baglio was born somewhere", + "Alias":"Ajeje" + } + } +] + +---- + + +we can execute the following query to MATCH and SET the `Person:Actor` nodes: + +[source,cypher] +---- +CALL apoc.load.json("all.json") YIELD value +WITH value +CALL apoc.node.match(value.labels, value.matchProps, value.setProps) +YIELD node +RETURN node +---- + +.Results +[opts="header"] +|=== +| node +| (:Actor:Person {name: "Giacomino Poretti",bio: "Giacomo Poretti was born on April 26 1956 in Busto Garolfo",Alias: "Tafazzi"}) +| (:Actor:Person {name: "Giovanni Storti",bio: "Giovanni Storti was born ...",Alias: "Rezzonico"}) +| (:Actor:Person {name: "Cataldo Baglio",bio: "Cataldo Baglio was born somewhere",Alias: "Ajeje"}) +|=== + + + +[[matching-relationship]] +== Matching relationships + +[.emphasis] +"apoc.rel.match(startNode, relType, identProps:{key:value, ...}, endNode, onMatchProps:{key:value, ...})" - match relationship with dynamic type, with support for setting properties on match + +=== Signature + +[source] +---- +apoc.rel.match(startNode :: NODE?, relationshipType :: STRING?, identProps :: MAP?, endNode :: NODE?, onMatchProps = {} :: MAP?) :: (rel :: RELATIONSHIP?) +---- + +=== Input parameters +[.procedures, opts=header] +|=== +| Name | Type | Default | Description +| startNode | NODE? | null | Start node of the MATCH pattern. +| relationshipType | STRING? | null | Relationship type of the MATCH pattern. +| identProps | MAP? | null | Properties on the relationships that are used for MATCH statement. +| endNode | NODE? | null | End node of the MATCH pattern. +| onMatchProps | MAP? | {} | Properties that are set when the relationship is matched. +|=== + +=== Output parameters +[.procedures, opts=header] +|=== +| Name | Type +|rel|RELATIONSHIP? +|=== + +=== Usage Examples + +The examples in this section are based on the following graph: + +[source,cypher] +---- +CREATE (p:Person {name: "Billy Reviewer"}) +CREATE (m:Movie {title:"spooky and goofy movie"}) +CREATE (p)-[REVIEW {lastSeen: date("1984-12-21")}]->(m); +---- + +This procedure provides a more flexible and performant way of matching relationships than Cypher's https://neo4j.com/docs/cypher-manual/current/clauses/match/[`MATCH`^] clause. + +The example below shows equivalent ways of matching an `REVIEW` relationship between the `Billy Reviewer` and a Movie nodes: + +// tag::tabs[] +[.tabs] + +.apoc.rel.match +[source,cypher] +---- +MATCH (p:Person {name: "Billy Reviewer"}) +MATCH (m:Movie {title:"spooky and goofy movie"}) +CALL apoc.rel.match( + p, "REVIEW", + {lastSeen: date("1984-12-21")}, + m, {rating: 9.5} +) +YIELD rel +RETURN rel; +---- + +.MATCH clause +[source,cypher] +---- +MATCH (p:Person {name: "Billy Reviewer"}) +MATCH (m:Movie {title:"spooky and goofy movie"}) +MATCH (p)-[rel:REVIEW {lastSeen: date("1984-12-21")}]->(m) +SET rel.rating = 9.5 +RETURN rel; +---- +// end::tabs[] + +If we run these queries, we'll see output as shown below: + +.Results +[opts="header"] +|=== +| rel +| [:REVIEW {lastSeen: 1984-12-21, rating: 9.5}] +|=== + +But this procedure is mostly useful for matching relationships that have a dynamic relationship type or dynamic properties. +For example, we might want to match a relationship with a type or properties passed in as parameters. + +The following creates `relationshipType` and `properties` parameters: + +[source,cypher] +---- +:param relType => ("REVIEW"); +:param identityProperties => ({lastSeen: date("1984-12-21")}); +---- + +The following match a relationship with a type and properties based on the previously defined parameters: + +[source,cypher] +---- +MATCH (bill:Person {name: "Billy Reviewer"}) +MATCH (movie:Movie {title:"spooky and goofy movie"}) +CALL apoc.rel.match(bill, $relType, $identityProperties, movie, {}}) +YIELD rel +RETURN rel; +---- + +.Results +[opts="header"] +|=== +| rel +| [:REVIEW {lastSeen: 1984-12-21, rating: 9.5}] +|=== + + +In addition, we can use the `apoc.rel.match` along with the `apoc.load.json` to dynamically set nodes starting from a JSON. + +For example, given the following dataset: + +[source,cypher] +---- +CREATE (giacomo:Person:Actor {name: 'Giacomino Poretti'}), + (aldo:Person:Actor {name: 'Cataldo Baglio'}), + (m:Movie {title: 'Three Men and a Leg', `y:ear`: 1997, `mean-rating`: 8, `release date`: date('1997-12-27')}) +WITH aldo, m +CREATE (aldo)-[:ACTED_IN {role: 'Aldo'}]->(m), + (aldo)-[:DIRECTED {role: 'Director'}]->(m) +---- + +and the following `all.json` file +(note that it leverage the elementId of start and end nodes, therefore the values are mutable): + +[source,json] +---- +[ + { + "startNodeId": "4:b3d54d7b-2c64-4994-9a26-0bb2aa175291:0", + "endNodeId": "4:b3d54d7b-2c64-4994-9a26-0bb2aa175291:0", + "type":"ACTED_IN", + "matchProps":{ + "role":"Aldo" + }, + "setProps":{ + "ajeje":"Brazorf", + "conte":"Dracula" + } + }, + { + "startNodeId": "4:b3d54d7b-2c64-4994-9a26-0bb2aa175291:0", + "endNodeId": "4:b3d54d7b-2c64-4994-9a26-0bb2aa175291:0", + "type":"DIRECTED", + "matchProps":{ + "role":"Director" + }, + "setProps":{ + "description": "did stuff..", + "alias":"i dunnoaaaaaa" + } + } +] +---- + + +we can execute the following query to MATCH and SET the relationships: + +[source,cypher] +---- +CALL apoc.load.json("all.json") YIELD value +WITH value +WHERE elementId(start) = value.startNodeId AND elementId(end) = value.endNodeId +CALL apoc.rel.match(start, value.type, value.matchProps, end, value.setProps) +YIELD rel +RETURN rel +---- + +.Results +[opts="header"] +|=== +| rel +| [:ACTED_IN {role: "Aldo",conte: "Dracula",ajeje: "Brazorf"}] +| (:Actor:Person {name: "Giovanni Storti",bio: "Giovanni Storti was born ...",Alias: "Rezzonico"}) +| [:DIRECTED {bio: "did stuff..",alias: "i dunno",role: "Director"}] +|=== diff --git a/full/src/main/java/apoc/entities/EntitiesExtended.java b/full/src/main/java/apoc/entities/EntitiesExtended.java new file mode 100644 index 0000000000..d6aca96503 --- /dev/null +++ b/full/src/main/java/apoc/entities/EntitiesExtended.java @@ -0,0 +1,131 @@ +package apoc.entities; + +import apoc.Extended; +import apoc.result.NodeResult; +import apoc.result.RelationshipResult; +import apoc.util.EntityUtil; +import apoc.util.ExtendedUtil; +import apoc.util.Util; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.neo4j.graphdb.*; +import org.neo4j.procedure.*; + +@Extended +public class EntitiesExtended { + + public static final String INVALID_LABELS_MESSAGE = + "The list of label names may not contain any `NULL` or empty `STRING` values. If you wish to match a `NODE` without a label, pass an empty list instead."; + public static final String INVALID_IDENTIFY_PROPERTY_MESSAGE = + "you need to supply at least one identifying property for a match"; + public static final String INVALID_REL_TYPE_MESSAGE = + "It is not possible to match a `RELATIONSHIP` without a `RELATIONSHIP` type."; + + @Context + public Transaction tx; + + @UserFunction("apoc.node.rebind") + @Description("apoc.node.rebind(node - to rebind a node (i.e. executing a Transaction.getNodeById(node.getId()) ") + public Node nodeRebind(@Name("node") Node node) { + return Util.rebind(tx, node); + } + + @UserFunction("apoc.rel.rebind") + @Description( + "apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId()) ") + public Relationship relationshipRebind(@Name("rel") Relationship rel) { + return Util.rebind(tx, rel); + } + + @UserFunction("apoc.any.rebind") + @Description( + "apoc.any.rebind(Object) - to rebind any rel, node, path, map, list or combination of them (i.e. executing a Transaction.getNodeById(node.getId()) / Transaction.getRelationshipById(rel.getId()))") + public Object anyRebind(@Name("any") Object any) { + return EntityUtil.anyRebind(tx, any); + } + + @Procedure(value = "apoc.node.match", mode = Mode.WRITE) + @Description("Matches the given `NODE` values with the given dynamic labels.") + public Stream nodes( + @Name(value = "labels") List labelNames, + @Name(value = "identProps") Map identProps, + @Name(value = "onMatchProps", defaultValue = "{}") Map onMatchProps) { + /* + * Partially taken from apoc.merge.nodes, modified to perform a match instead of a merge. + */ + final Result nodeResult = getNodeResult(labelNames, identProps, onMatchProps); + return nodeResult.columnAs("n").stream().map(node -> new NodeResult((Node) node)); + } + + @Procedure(value = "apoc.rel.match", mode = Mode.WRITE) + @Description("Matches the given `RELATIONSHIP` values with the given dynamic types/properties.") + public Stream relationship( + @Name(value = "startNode") Node startNode, + @Name(value = "relType") String relType, + @Name(value = "identProps") Map identProps, + @Name(value = "endNode") Node endNode, + @Name(value = "onMatchProps", defaultValue = "{}") Map onMatchProps) { + /* + * Partially taken from apoc.merge.relationship, modified to perform a match instead of a merge. + */ + final Result execute = getRelResult(startNode, relType, identProps, endNode, onMatchProps); + return execute.columnAs("r").stream().map(rel -> new RelationshipResult((Relationship) rel)); + } + + private Result getRelResult( + Node startNode, + String relType, + Map identProps, + Node endNode, + Map onMatchProps) { + String identPropsString = buildIdentPropsString(identProps); + onMatchProps = Objects.requireNonNullElse(onMatchProps, Util.map()); + + if (StringUtils.isBlank(relType)) { + throw new IllegalArgumentException(INVALID_REL_TYPE_MESSAGE); + } + + Map params = Util.map( + "identProps", identProps, "onMatchProps", onMatchProps, "startNode", startNode, "endNode", endNode); + + final String cypher = "WITH $startNode as startNode, $endNode as endNode " + + "MATCH (startNode)-[r:" + Util.quote(relType) + "{" + identPropsString + "}]->(endNode) " + + "SET r+= $onMatchProps " + + "RETURN r"; + return tx.execute(cypher, params); + } + + private Result getNodeResult( + List labelNames, Map identProps, Map onMatchProps) { + onMatchProps = Objects.requireNonNullElse(onMatchProps, Util.map()); + labelNames = Objects.requireNonNullElse(labelNames, Collections.EMPTY_LIST); + + if (MapUtils.isEmpty(identProps)) { + throw new IllegalArgumentException(INVALID_IDENTIFY_PROPERTY_MESSAGE); + } + + boolean containsInvalidLabels = labelNames.stream().anyMatch(label -> StringUtils.isBlank(label)); + if (containsInvalidLabels) { + throw new IllegalArgumentException(INVALID_LABELS_MESSAGE); + } + + Map params = Util.map("identProps", identProps, "onMatchProps", onMatchProps); + String identPropsString = buildIdentPropsString(identProps); + + final String cypher = "MATCH (n" + ExtendedUtil.joinStringLabels(labelNames) + " {" + identPropsString + "}) " + + "SET n += $onMatchProps " + + "RETURN n"; + return tx.execute(cypher, params); + } + + private String buildIdentPropsString(Map identProps) { + if (identProps == null) return ""; + return identProps.keySet().stream() + .map(Util::quote) + .map(s -> s + ":$identProps." + s) + .collect(Collectors.joining(",")); + } +} diff --git a/full/src/main/java/apoc/nodes/NodesExtended.java b/full/src/main/java/apoc/nodes/NodesExtended.java deleted file mode 100644 index 4b6d5bac51..0000000000 --- a/full/src/main/java/apoc/nodes/NodesExtended.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package apoc.nodes; - -import apoc.Extended; -import apoc.util.EntityUtil; -import apoc.util.Util; -import org.neo4j.graphdb.Node; -import org.neo4j.graphdb.Relationship; -import org.neo4j.graphdb.Transaction; -import org.neo4j.procedure.Context; -import org.neo4j.procedure.Description; -import org.neo4j.procedure.Name; -import org.neo4j.procedure.UserFunction; - -@Extended -public class NodesExtended { - - @Context - public Transaction tx; - - @UserFunction("apoc.node.rebind") - @Description("apoc.node.rebind(node - to rebind a node (i.e. executing a Transaction.getNodeById(node.getId()) ") - public Node nodeRebind(@Name("node") Node node) { - return Util.rebind(tx, node); - } - - @UserFunction("apoc.rel.rebind") - @Description( - "apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId()) ") - public Relationship relationshipRebind(@Name("rel") Relationship rel) { - return Util.rebind(tx, rel); - } - - @UserFunction("apoc.any.rebind") - @Description( - "apoc.any.rebind(Object) - to rebind any rel, node, path, map, list or combination of them (i.e. executing a Transaction.getNodeById(node.getId()) / Transaction.getRelationshipById(rel.getId()))") - public Object anyRebind(@Name("any") Object any) { - return EntityUtil.anyRebind(tx, any); - } -} diff --git a/full/src/main/java/apoc/util/ExtendedUtil.java b/full/src/main/java/apoc/util/ExtendedUtil.java index 8dba886ace..63f7b1b5f2 100644 --- a/full/src/main/java/apoc/util/ExtendedUtil.java +++ b/full/src/main/java/apoc/util/ExtendedUtil.java @@ -1,9 +1,12 @@ package apoc.util; import java.time.Duration; +import java.util.Collection; import java.util.Objects; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; public class ExtendedUtil { public static T withBackOffRetries( @@ -55,4 +58,10 @@ private static long getDelay(int backoffRetry, int countDown, boolean exponentia Duration.ofSeconds(30).toMillis() // Max 30s ); } + + public static String joinStringLabels(Collection labels) { + return CollectionUtils.isNotEmpty(labels) + ? ":" + labels.stream().map(Util::quote).collect(Collectors.joining(":")) + : ""; + } } diff --git a/full/src/main/resources/extended.txt b/full/src/main/resources/extended.txt index 914a3fd03e..12a215a6b3 100644 --- a/full/src/main/resources/extended.txt +++ b/full/src/main/resources/extended.txt @@ -207,6 +207,8 @@ apoc.coll.fillObject apoc.data.email apoc.node.rebind apoc.rel.rebind +apoc.node.match +apoc.rel.match apoc.static.get apoc.static.getAll apoc.trigger.nodesByLabel diff --git a/full/src/test/java/apoc/entities/EntitiesExtendedTest.java b/full/src/test/java/apoc/entities/EntitiesExtendedTest.java new file mode 100644 index 0000000000..f0dc3bd2fa --- /dev/null +++ b/full/src/test/java/apoc/entities/EntitiesExtendedTest.java @@ -0,0 +1,355 @@ +package apoc.entities; + +import static apoc.entities.EntitiesExtended.*; +import static apoc.util.TestUtil.*; +import static java.util.Collections.singletonList; +import static org.junit.Assert.*; + +import apoc.create.Create; +import apoc.meta.Meta; +import apoc.util.MapUtil; +import apoc.util.TestUtil; +import apoc.util.Util; +import java.time.LocalDate; +import java.util.*; +import java.util.stream.Collectors; +import org.junit.*; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.graphdb.*; +import org.neo4j.internal.helpers.collection.Iterables; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; + +public class EntitiesExtendedTest { + + @Rule + public DbmsRule db = new ImpermanentDbmsRule() + .withSetting(GraphDatabaseSettings.procedure_unrestricted, singletonList("apoc.*")); + + @Before + public void setUp() throws Exception { + TestUtil.registerProcedure(db, EntitiesExtended.class, Create.class, Meta.class); + seedGraph(); + } + + @After + public void teardown() { + db.shutdown(); + } + + @Test + public void rebind() { + TestUtil.testCall( + db, + "CREATE (a:Foo)-[r1:MY_REL]->(b:Bar)-[r2:ANOTHER_REL]->(c:Baz) WITH a,b,c,r1,r2 \n" + + "RETURN apoc.any.rebind({first: a, second: b, third: c, rels: [r1, r2]}) as rebind", + (row) -> { + final Map rebind = (Map) row.get("rebind"); + final List rels = (List) rebind.get("rels"); + final Relationship firstRel = rels.get(0); + final Relationship secondRel = rels.get(1); + assertEquals(firstRel.getStartNode(), rebind.get("first")); + assertEquals(firstRel.getEndNode(), rebind.get("second")); + assertEquals(secondRel.getStartNode(), rebind.get("second")); + assertEquals(secondRel.getEndNode(), rebind.get("third")); + }); + + TestUtil.testCall( + db, + "CREATE p1=(a:Foo)-[r1:MY_REL]->(b:Bar), p2=(:Bar)-[r2:ANOTHER_REL]->(c:Baz) \n" + + "RETURN apoc.any.rebind([p1, p2]) as rebind", + (row) -> { + final List rebindList = (List) row.get("rebind"); + assertEquals(2, rebindList.size()); + final Path firstPath = rebindList.get(0); + assertFooBarPath(firstPath); + final Path secondPath = rebindList.get(1); + assertPath(secondPath, List.of("Bar", "Baz"), List.of("ANOTHER_REL")); + }); + + // check via `apoc.meta.cypher.type()` that, even if the return type is Object, + // the output of a rebound Path is also a Path (i.e.: `PATH NOT NULL`) + TestUtil.testCall( + db, + "CREATE path=(a:Foo)-[r1:MY_REL]->(b:Bar)\n" + "WITH apoc.any.rebind(path) AS rebind\n" + + "RETURN rebind, apoc.meta.cypher.type(rebind) as valueType", + (row) -> { + final String valueType = (String) row.get("valueType"); + assertEquals("PATH", valueType); + + final Path pathRebind = (Path) row.get("rebind"); + assertFooBarPath(pathRebind); + }); + } + + @Test + public void testMatchNode() { + testCall( + db, + "CALL apoc.node.match(['Actor','Person'],{name:'Giacomino Poretti'}, {bio:'Giacomo Poretti was born on April 26, 1956 in Busto Garolfo....'})", + (row) -> { + Node node = (Node) row.get("node"); + assertTrue(node.hasLabel(Label.label("Person"))); + assertTrue(node.hasLabel(Label.label("Actor"))); + assertEquals("Giacomino Poretti", node.getProperty("name")); + assertNotNull(node.getProperty("bio")); + }); + } + + @Test + public void testMatchWithoutLabel() { + testCall(db, "CALL apoc.node.match(null, {name:'Massimo Venier'}) YIELD node RETURN node", (row) -> { + Node node = (Node) row.get("node"); + assertEquals("Massimo Venier", node.getProperty("name")); + }); + } + + @Test + public void testMatchWithoutOnMatchProps() { + String movieTitle = "Three Men and a Leg"; + testCall( + db, + "CALL apoc.node.match(['Movie'], $identProps, null) YIELD node RETURN node", + Util.map("identProps", Util.map("title", movieTitle)), + (row) -> { + Node node = (Node) row.get("node"); + assertEquals(movieTitle, node.getProperty("title")); + }); + } + + @Test + public void testMatchNodeWithEmptyLabelList() { + testCall(db, "CALL apoc.node.match([], {name:'Cataldo Baglio'}) YIELD node RETURN node", (row) -> { + Node node = (Node) row.get("node"); + assertEquals("Cataldo Baglio", node.getProperty("name")); + }); + } + + @Test + public void testMatchWithEmptyIdentityPropertiesShouldFail() { + Set.of("null", "{}") + .forEach(idProps -> failNodeMatchWithMessage( + () -> testCall( + db, + "CALL apoc.node.match(['Person']," + idProps + + ", {name:'John'}) YIELD node RETURN node", + row -> fail()), + IllegalArgumentException.class, + INVALID_IDENTIFY_PROPERTY_MESSAGE)); + } + + @Test + public void testMatchNodeWithNullLabelsShouldFail() { + failNodeMatchWithMessage( + () -> testCall(db, "CALL apoc.node.match([null], {name:'John'}) YIELD node RETURN node", row -> fail()), + IllegalArgumentException.class, + INVALID_LABELS_MESSAGE); + } + + @Test + public void testMatchNodeWithMixedLabelsContainingNullShouldFail() { + failNodeMatchWithMessage( + () -> testCall( + db, + "CALL apoc.node.match(['Person', null], {name:'John'}) YIELD node RETURN node", + row -> fail()), + IllegalArgumentException.class, + INVALID_LABELS_MESSAGE); + } + + @Test + public void testMatchNodeWithSingleEmptyLabelShouldFail() { + failNodeMatchWithMessage( + () -> testCall(db, "CALL apoc.node.match([''], {name:'John'}) YIELD node RETURN node", row -> fail()), + IllegalArgumentException.class, + INVALID_LABELS_MESSAGE); + } + + @Test + public void testMatchNodeContainingMixedLabelsContainingEmptyStringShouldFail() { + failNodeMatchWithMessage( + () -> testCall( + db, + "CALL apoc.node.match(['Person', ''], {name:'John'}) YIELD node RETURN node", + row -> fail()), + IllegalArgumentException.class, + INVALID_LABELS_MESSAGE); + } + + @Test + public void testEscapeIdentityPropertiesWithSpecialCharactersShouldWork() { + MapUtil.map( + "title", + "Three Men and a Leg", + "y:ear", + 1997L, + "mean-rating", + 8L, + "release date", + LocalDate.of(1997, 12, 27)) + .forEach((key, value) -> { + Map identProps = MapUtil.map(key, value); + Map params = MapUtil.map("identProps", identProps); + + testCall( + db, + "CALL apoc.node.match(['Movie'], $identProps) YIELD node RETURN node", + params, + (row) -> { + Node node = (Node) row.get("node"); + assertNotNull(node); + assertTrue(node.hasProperty(key)); + assertEquals(value, node.getProperty(key)); + }); + }); + } + + @Test + public void testLabelsWithSpecialCharactersShouldWork() { + for (String label : + new String[] {"Label with spaces", ":LabelWithColon", "label-with-dash", "LabelWithUmlautsÄÖÜ"}) { + db.executeTransactionally(String.format("CREATE (n:`%s` {id:1})", label)); + Map params = MapUtil.map("label", label); + testCall( + db, + "CALL apoc.node.match([$label],{id: 1}, {}) YIELD node RETURN node", + params, + row -> assertTrue(row.get("node") instanceof Node)); + db.executeTransactionally(String.format("MATCH (n:`%s`) DELETE n", label)); + } + } + + // RELATIONSHIP MATCH + + @Test + public void testMatchRelationships() { + testCall( + db, + "MATCH (aldo:Person{name:'Cataldo Baglio'}), (movie:Movie) " + "WITH aldo, movie " + + "CALL apoc.rel.match(aldo, 'ACTED_IN', {role:'Aldo'}, movie, {secondaryRoles: ['Ajeje Brazorf', 'Dracula']}) YIELD rel RETURN rel", + (row) -> { + Relationship rel = (Relationship) row.get("rel"); + assertEquals("ACTED_IN", rel.getType().name()); + assertEquals("Aldo", rel.getProperty("role")); + assertArrayEquals( + new String[] {"Ajeje Brazorf", "Dracula"}, (String[]) rel.getProperty("secondaryRoles")); + }); + } + + @Test + public void testMatchRelationshipsWithNullOnMatchProps() { + testCall( + db, + "MATCH (giacomino:Person{name:'Giacomino Poretti'}), (movie:Movie) " + "WITH giacomino, movie " + + "CALL apoc.rel.match(giacomino, 'ACTED_IN', {role:'Giacomo'}, movie, null) YIELD rel RETURN rel", + (row) -> { + Relationship rel = (Relationship) row.get("rel"); + assertEquals("ACTED_IN", rel.getType().name()); + assertEquals("Giacomo", rel.getProperty("role")); + }); + } + + @Test + public void testMatchRelationshipsWithNullIdentProps() { + testCall( + db, + "MATCH (giova:Person{name:'Giovanni Storti'}), (movie:Movie) " + "WITH giova, movie " + + "CALL apoc.rel.match(giova, 'ACTED_IN', null, movie, null) YIELD rel RETURN rel", + (row) -> { + Relationship rel = (Relationship) row.get("rel"); + assertEquals("ACTED_IN", rel.getType().name()); + assertEquals("Giovanni", rel.getProperty("role")); + }); + } + + @Test + public void testRelationshipTypesWithSpecialCharactersShouldWork() { + for (String relType : new String[] {"Reltype with space", ":ReltypeWithCOlon", "rel-type-with-dash"}) { + db.executeTransactionally(String.format("CREATE (:TestStart)-[:`%s`]->(:TestEnd)", relType)); + Map params = MapUtil.map("relType", relType); + testCall( + db, + "MATCH (s:TestStart), (e:TestEnd) " + "WITH s,e " + + "CALL apoc.rel.match(s, $relType, null, e, null) YIELD rel RETURN rel", + params, + row -> { + assertTrue(row.get("rel") instanceof Relationship); + }); + db.executeTransactionally("MATCH (n) WHERE n:TestStart OR n:TestEnd DETACH DELETE n"); + } + } + + @Test + public void testMatchRelWithNullRelTypeShouldFail() { + failEdgeMatchWithMessage( + () -> testCall( + db, + "MATCH (massimo:Director), (movie:Movie) " + "WITH massimo, movie " + + "CALL apoc.rel.match(massimo, null, null, null, movie) YIELD rel RETURN rel", + row -> fail()), + IllegalArgumentException.class, + INVALID_REL_TYPE_MESSAGE); + } + + @Test + public void testMergeWithEmptyRelTypeShouldFail() { + failEdgeMatchWithMessage( + () -> testCall( + db, + "MATCH (massimo:Director), (movie:Movie) " + "WITH massimo, movie " + + "CALL apoc.rel.match(massimo, '', null, null, movie) YIELD rel RETURN rel", + row -> fail()), + IllegalArgumentException.class, + INVALID_REL_TYPE_MESSAGE); + } + + private void assertFooBarPath(Path pathRebind) { + assertPath(pathRebind, List.of("Foo", "Bar"), List.of("MY_REL")); + } + + private void assertPath(Path rebind, List labels, List relTypes) { + final List actualLabels = Iterables.stream(rebind.nodes()) + .map(i -> i.getLabels().iterator().next()) + .map(Label::name) + .collect(Collectors.toList()); + assertEquals(labels, actualLabels); + final List actualRelTypes = Iterables.stream(rebind.relationships()) + .map(Relationship::getType) + .map(RelationshipType::name) + .collect(Collectors.toList()); + assertEquals(relTypes, actualRelTypes); + } + + private void seedGraph() { + try (Transaction tx = db.beginTx()) { + tx.execute("CREATE (giacomo:Person:Actor {name: 'Giacomino Poretti'}),\n" + + " (aldo:Person:Actor {name: 'Cataldo Baglio'}),\n" + + " (giovanni:Person:Actor {name: 'Giovanni Storti'}),\n" + + " (massimo:Person:Director {name: 'Massimo Venier'}),\n" + + " (m:Movie {title: 'Three Men and a Leg', `y:ear`: 1997, `mean-rating`: 8, `release date`: date('1997-12-27')})\n" + + "WITH aldo, giovanni, giacomo, massimo, m\n" + + "CREATE (aldo)-[:ACTED_IN {role: 'Aldo'}]->(m),\n" + + " (giovanni)-[:ACTED_IN {role: 'Giovanni'}]->(m),\n" + + " (giacomo)-[:ACTED_IN {role: 'Giacomo'}]->(m),\n" + + " (massimo)-[:DIRECTED]->(m)"); + tx.commit(); + } catch (RuntimeException e) { + throw e; + } + } + + private void failNodeMatchWithMessage(Runnable lambda, Class exceptionType, String message) { + failMatchWithMessage(lambda, exceptionType, message, "apoc.node.match"); + } + + private void failEdgeMatchWithMessage(Runnable lambda, Class exceptionType, String message) { + failMatchWithMessage(lambda, exceptionType, message, "apoc.rel.match"); + } + + private void failMatchWithMessage( + Runnable lambda, Class exceptionType, String message, String apoc) { + QueryExecutionException queryExecutionException = + Assert.assertThrows(QueryExecutionException.class, lambda::run); + assertError(queryExecutionException, message, exceptionType, apoc); + } +} diff --git a/full/src/test/java/apoc/nodes/NodesExtendedTest.java b/full/src/test/java/apoc/nodes/NodesExtendedTest.java deleted file mode 100644 index 657ac63ba8..0000000000 --- a/full/src/test/java/apoc/nodes/NodesExtendedTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package apoc.nodes; - -import static java.util.Collections.singletonList; -import static org.junit.Assert.assertEquals; - -import apoc.create.Create; -import apoc.meta.Meta; -import apoc.util.TestUtil; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.neo4j.configuration.GraphDatabaseSettings; -import org.neo4j.graphdb.Label; -import org.neo4j.graphdb.Path; -import org.neo4j.graphdb.Relationship; -import org.neo4j.graphdb.RelationshipType; -import org.neo4j.internal.helpers.collection.Iterables; -import org.neo4j.test.rule.DbmsRule; -import org.neo4j.test.rule.ImpermanentDbmsRule; - -public class NodesExtendedTest { - - @Rule - public DbmsRule db = new ImpermanentDbmsRule() - .withSetting(GraphDatabaseSettings.procedure_unrestricted, singletonList("apoc.*")); - - @Before - public void setUp() throws Exception { - TestUtil.registerProcedure(db, NodesExtended.class, Create.class, Meta.class); - } - - @After - public void teardown() { - db.shutdown(); - } - - @Test - public void rebind() { - TestUtil.testCall( - db, - "CREATE (a:Foo)-[r1:MY_REL]->(b:Bar)-[r2:ANOTHER_REL]->(c:Baz) WITH a,b,c,r1,r2 \n" - + "RETURN apoc.any.rebind({first: a, second: b, third: c, rels: [r1, r2]}) as rebind", - (row) -> { - final Map rebind = (Map) row.get("rebind"); - final List rels = (List) rebind.get("rels"); - final Relationship firstRel = rels.get(0); - final Relationship secondRel = rels.get(1); - assertEquals(firstRel.getStartNode(), rebind.get("first")); - assertEquals(firstRel.getEndNode(), rebind.get("second")); - assertEquals(secondRel.getStartNode(), rebind.get("second")); - assertEquals(secondRel.getEndNode(), rebind.get("third")); - }); - - TestUtil.testCall( - db, - "CREATE p1=(a:Foo)-[r1:MY_REL]->(b:Bar), p2=(:Bar)-[r2:ANOTHER_REL]->(c:Baz) \n" - + "RETURN apoc.any.rebind([p1, p2]) as rebind", - (row) -> { - final List rebindList = (List) row.get("rebind"); - assertEquals(2, rebindList.size()); - final Path firstPath = rebindList.get(0); - assertFooBarPath(firstPath); - final Path secondPath = rebindList.get(1); - assertPath(secondPath, List.of("Bar", "Baz"), List.of("ANOTHER_REL")); - }); - - // check via `apoc.meta.cypher.type()` that, even if the return type is Object, - // the output of a rebound Path is also a Path (i.e.: `PATH NOT NULL`) - TestUtil.testCall( - db, - "CREATE path=(a:Foo)-[r1:MY_REL]->(b:Bar)\n" + "WITH apoc.any.rebind(path) AS rebind\n" - + "RETURN rebind, apoc.meta.cypher.type(rebind) as valueType", - (row) -> { - final String valueType = (String) row.get("valueType"); - assertEquals("PATH", valueType); - - final Path pathRebind = (Path) row.get("rebind"); - assertFooBarPath(pathRebind); - }); - } - - private void assertFooBarPath(Path pathRebind) { - assertPath(pathRebind, List.of("Foo", "Bar"), List.of("MY_REL")); - } - - private void assertPath(Path rebind, List labels, List relTypes) { - final List actualLabels = Iterables.stream(rebind.nodes()) - .map(i -> i.getLabels().iterator().next()) - .map(Label::name) - .collect(Collectors.toList()); - assertEquals(labels, actualLabels); - final List actualRelTypes = Iterables.stream(rebind.relationships()) - .map(Relationship::getType) - .map(RelationshipType::name) - .collect(Collectors.toList()); - assertEquals(relTypes, actualRelTypes); - } -} diff --git a/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java b/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java index d706b725ad..3699f6a8f1 100644 --- a/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java +++ b/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java @@ -28,9 +28,9 @@ import static org.neo4j.test.assertion.Assert.assertEventually; import apoc.create.Create; +import apoc.entities.EntitiesExtended; import apoc.load.Jdbc; import apoc.nlp.gcp.GCPProcedures; -import apoc.nodes.NodesExtended; import apoc.util.TestUtil; import java.util.Collections; import java.util.Map; @@ -55,7 +55,7 @@ public void initDb() { TestUtil.registerProcedure( db, Periodic.class, - NodesExtended.class, + EntitiesExtended.class, GCPProcedures.class, Create.class, PeriodicExtended.class, From d43de1468a1fc8b71a34970c3b0fd613be7a195a Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Mon, 20 Jan 2025 18:14:09 +0100 Subject: [PATCH 3/3] [NOID] Fixes #3360: Add support for newer Elasticsearch search api (#4269) * [NOID] Fixes #3360: Add support for newer Elasticsearch search api * [NOID] cleanup and fix tests * [NOID] format changes --- .../database-integration/elasticsearch.adoc | 122 +++++- full-it/build.gradle | 1 + .../apoc/full/it}/es/ElasticSearchTest.java | 377 +++++------------- .../full/it/es/ElasticVersionEightTest.java | 88 ++++ .../full/it/es/ElasticVersionSevenTest.java | 202 ++++++++++ full/src/main/java/apoc/es/ElasticSearch.java | 186 ++------- .../java/apoc/es/ElasticSearchConfig.java | 11 + .../java/apoc/es/ElasticSearchHandler.java | 145 +++++++ 8 files changed, 705 insertions(+), 427 deletions(-) rename {full/src/test/java/apoc => full-it/src/test/java/apoc/full/it}/es/ElasticSearchTest.java (56%) create mode 100644 full-it/src/test/java/apoc/full/it/es/ElasticVersionEightTest.java create mode 100644 full-it/src/test/java/apoc/full/it/es/ElasticVersionSevenTest.java create mode 100644 full/src/main/java/apoc/es/ElasticSearchHandler.java diff --git a/docs/asciidoc/modules/ROOT/pages/database-integration/elasticsearch.adoc b/docs/asciidoc/modules/ROOT/pages/database-integration/elasticsearch.adoc index 6f783aa7ac..b7b8667ca4 100644 --- a/docs/asciidoc/modules/ROOT/pages/database-integration/elasticsearch.adoc +++ b/docs/asciidoc/modules/ROOT/pages/database-integration/elasticsearch.adoc @@ -23,6 +23,13 @@ include::example$generated-documentation/apoc.es.delete.adoc[] |=== // end::elasticsearch[] +[NOTE] +==== +It is currently not possible to query Elastic 8 via certificate, +but only disabling ssl with the configuration `"xpack.security.http.ssl.enabled=false"`, using the basic authentication via the header config (see `config parameter` below) +or (not recommended) disabling security via `xpack.security.enabled=false` +==== + == Example @@ -67,7 +74,7 @@ Here an example: [source,cypher] ---- // It's important to create an index to improve performance -CREATE INDEX ON :Document(id) +CREATE INDEX FOR (n:Document) ON (n.id) // First query: get first chunk of data + the scroll_id for pagination CALL apoc.es.query('localhost','test-index','test-type','name:Neo4j&size=1&scroll=5m',null) yield value with value._scroll_id as scrollId, value.hits.hits as hits // Do something with hits @@ -101,16 +108,71 @@ call apoc.es.post(host-or-key,index-or-null,type-or-null,id-or-null,query-or-nul === host-or-key parameter -The parameter can be a direct host or url, or an entry to be lookup up in apoc.conf +The parameter can be: * host * host:port +* username:password@host:port * http://host:port -* lookup via key to apoc.es..url -* lookup via key apoc.es..host +* http://username:password@host:port + +For example, by using the `apoc.es.stats`, we can execute: +[source, cypher] +---- +CALL apoc.es.stats('http://username:password@host:port') +---- + +Moreover, it can be an entry to be lookup up in `apoc.conf`: + * lookup apoc.es.url * lookup apoc.es.host +This takes precedence over the direct string host or url as the first parameter, as above. + +For example, with a `apoc.conf` like this: +``` +apoc.es.url=http://username:password@host:port +``` + +or like this : +``` +apoc.es.host=username:password@host:port +``` + +we can connect to elastic by putting null as the first parameter. + +For example, by using the `apoc.es.stats`, we can execute: +[source, cypher] +---- +CALL apoc.es.stats(null) +---- + +Furthermore, it can be an entry to be lookup up in `apoc.conf`, +where `` have be placed in the first parameter: + +* lookup via key to apoc.es..url +* lookup via key apoc.es..host + + +For example, with a `apoc.conf` like this: +``` +apoc.es.custom.url=http://username:password@host:port +``` + +or like this: +``` +apoc.es.custom.host=username:password@host:port +``` + +we can connect to elastic by putting null as the first parameter. + +For example, by using the `apoc.es.stats`, we can execute: +[source, cypher] +---- +CALL apoc.es.stats('custom') +---- + + === index parameter Main ES index, will be sent directly, if null then "_all" multiple indexes can be separated by comma in the string. @@ -140,8 +202,10 @@ Config can be an optional *map*, which can have the following entries: |=== | name | type | default | description | headers | `Map` | {`content-type`: "application/json", `method`, ""} | Contains a header map to add (or replace) the default one. - The `method: ` is needed by APOC to figure out under the hood, which http request method to pass. - That is, by default, it is `PUT` with the `apoc.es.put`, POST with the `apoc.es.post` and `apoc.es.postRaw`, and GET in other cases. +The `method: ` is needed by APOC to figure out under the hood, which http request method to pass. +That is, by default, it is `PUT` with the `apoc.es.put`, POST with the `apoc.es.post` and `apoc.es.postRaw`, and GET in other cases. +| version | `String` | `DEFAULT` | Can be `DEFAULT` and `EIGHT`, in order to change the RestAPI endpoint based on Elastic version. +See `Endpoint` table below. |=== @@ -151,7 +215,7 @@ For example, by using the `apoc.es.stats`, we can execute: CALL apoc.es.stats('custom', { headers: {Authorization: "Basic "} }) ---- -to use a https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html[Basic authentication] +to use a https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html[Basic authentication] and create the following HTTP header: ``` Authorization: Basic @@ -160,6 +224,48 @@ Content-Type: application/json ``` +Some APIs in Elastic 8 can be called by the procedures without needing configuration `{version: 'EIGHT'}`, +for example the `apoc.es.stats`, +but for several APIs, it is necessary to set it, to handle the endpoint properly, +for example the `apoc.es.query`. + +.Endpoint +[opts=header] +|=== +| procedure(s) | with version: `DEFAULT` | with version: `EIGHT` +| `apoc.es.stats(host)` | /_stats | same as `DEFAULT` +| `apoc.es.query(host, index, type, query, payload, $conf)` | ///_stats? | //_stats? +| `apoc.es.getRaw/apoc.es.postRaw(host, path, payload, $conf)` | `/` | same as `DEFAULT` +| the others `apoc.es.(host, index, type, id, query, payload, $conf)` procedures | `///_stats?` +By default, the `` and `` will be populated as `_all`, while the ``, if not present, will be removed from the endpoint +| `///_stats?`. Note that you only need to enter one of three values between ``,`` and ``, the others will eventually be excluded from the endpoint. + +The type param is usually an underscore string indicating the type of the API, e.g. `_doc` or `_update` (while previously indicated https://www.elastic.co/guide/en/elasticsearch/reference/6.1/removal-of-types.html[the mapping types]). +This is to allow you to call, for example, https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html[this API] +|=== + + +For example, by using the `apoc.es.query`, we can execute a Search API: +[source, cypher] +---- +CALL apoc.es.query(<$host>, <$index>, <$type>, 'q=name:Neo4j', null, { version: 'EIGHT' }) +---- + +Updates a document in Elastic 8 via https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html#docs-update[Update API]: + +[source, cypher] +---- +CALL apoc.es.put($host,'','_doc','','refresh=true',{name: 'foo'}, {version: 'EIGHT'}) +---- + +Call a https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-index[Create Index API] in elastic 8: + +[source, cypher] +---- +CALL apoc.es.put($host,'', null, null, null, null, { version: 'EIGHT' }) +---- + + === Results -Results are stream of map in value. +Results are stream of map in value. \ No newline at end of file diff --git a/full-it/build.gradle b/full-it/build.gradle index a7e3e0bca3..7edc0248d2 100644 --- a/full-it/build.gradle +++ b/full-it/build.gradle @@ -15,6 +15,7 @@ dependencies { testImplementation group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.12.770' testImplementation group: 'org.xmlunit', name: 'xmlunit-core', version: '2.9.1' + testImplementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.9.0' configurations.all { exclude group: 'org.slf4j', module: 'slf4j-nop' diff --git a/full/src/test/java/apoc/es/ElasticSearchTest.java b/full-it/src/test/java/apoc/full/it/es/ElasticSearchTest.java similarity index 56% rename from full/src/test/java/apoc/es/ElasticSearchTest.java rename to full-it/src/test/java/apoc/full/it/es/ElasticSearchTest.java index 90c3e31191..fc22cfa37e 100644 --- a/full/src/test/java/apoc/es/ElasticSearchTest.java +++ b/full-it/src/test/java/apoc/full/it/es/ElasticSearchTest.java @@ -1,29 +1,15 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package apoc.es; +package apoc.full.it.es; import static apoc.ApocConfig.apocConfig; -import static org.junit.Assert.*; +import static apoc.util.MapUtil.map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import apoc.es.ElasticSearch; import apoc.util.JsonUtil; import apoc.util.TestUtil; import apoc.util.Util; +import com.fasterxml.jackson.core.JsonProcessingException; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; @@ -32,7 +18,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.neo4j.test.rule.DbmsRule; @@ -43,70 +28,58 @@ * @author mh * @since 21.05.16 */ -public class ElasticSearchTest { +public abstract class ElasticSearchTest { private static final String URL_CONF = "apoc.es.url"; - private static String HTTP_HOST_ADDRESS; - private static String HTTP_URL_ADDRESS; + static String HTTP_HOST_ADDRESS; + static String HTTP_URL_ADDRESS; public static ElasticsearchContainer elastic; - private static final String ES_INDEX = "test-index"; - - private static final String ES_TYPE = "test-type"; + static final String ES_INDEX = "test-index"; - private static final String ES_ID = UUID.randomUUID().toString(); + abstract String getEsType(); - private static final String HOST = "localhost"; + static final String ES_ID = UUID.randomUUID().toString(); private static final String DOCUMENT = "{\"name\":\"Neo4j\",\"company\":\"Neo Technology\",\"description\":\"Awesome stuff with a graph database\"}"; + static final String password = "myPassword"; + static Map basicAuthHeader = + Map.of("Authorization", "Basic " + Base64.getEncoder().encodeToString(("elastic:" + password).getBytes())); + @ClassRule public static DbmsRule db = new ImpermanentDbmsRule(); - private static Map defaultParams = Util.map("index", ES_INDEX, "type", ES_TYPE, "id", ES_ID); - private static Map paramsWithBasicAuth; - private static Map basicAuthHeader; + static Map paramsWithBasicAuth; - // We need a reference to the class implementing the procedures - private final ElasticSearch es = new ElasticSearch(); private static final Configuration JSON_PATH_CONFIG = Configuration.builder() .options(Option.DEFAULT_PATH_LEAF_TO_NULL, Option.SUPPRESS_EXCEPTIONS) .build(); - @BeforeClass - public static void setUp() throws Exception { - final String password = "myPassword"; - elastic = new ElasticsearchContainer(); + static void getElasticContainer(String tag, Map envMap, Map params) + throws JsonProcessingException { + + elastic = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + tag) + .withPassword(password) + .withEnv(envMap); elastic.start(); - defaultParams.put("host", elastic.getHttpHostAddress()); String httpHostAddress = elastic.getHttpHostAddress(); HTTP_HOST_ADDRESS = String.format("elastic:%s@%s", password, httpHostAddress); HTTP_URL_ADDRESS = "http://" + HTTP_HOST_ADDRESS; - defaultParams.put("host", HTTP_HOST_ADDRESS); - defaultParams.put("url", HTTP_URL_ADDRESS); - - // We can authenticate to elastic using the url `:@` - // or via Basic authentication, i.e. using the url `` together with the header `Authorization: - // Basic ` - // where is Base64(:) - String token = Base64.getEncoder().encodeToString(("elastic:" + password).getBytes()); - basicAuthHeader = Map.of("Authorization", "Basic " + token); - - paramsWithBasicAuth = new HashMap<>(defaultParams); - paramsWithBasicAuth.put("host", elastic.getHttpHostAddress()); - paramsWithBasicAuth.put("headers", basicAuthHeader); - + params.put("host", elastic.getHttpHostAddress()); + params.put("url", "http://" + elastic.getHttpHostAddress()); + paramsWithBasicAuth = params; TestUtil.registerProcedure(db, ElasticSearch.class); insertDocuments(); } - private static String getRawProcedureUrl(String id) { - return ES_INDEX + "/" + ES_TYPE + "/" + id + "?refresh=true"; + static String getRawProcedureUrl(String id, String type) { + return ES_INDEX + "/" + type + "/" + id + "?refresh=true"; } @AfterClass @@ -121,25 +94,28 @@ public static void tearDown() { * @param payload * @return */ - private static Map createDefaultProcedureParametersWithPayloadAndId(String payload, String id) { + static Map createDefaultProcedureParametersWithPayloadAndId(String payload, String id) { try { Map mapPayload = JsonUtil.OBJECT_MAPPER.readValue(payload, Map.class); - return addPayloadAndIdToParams(defaultParams, mapPayload, id); + return addPayloadAndIdToParams(paramsWithBasicAuth, mapPayload, id); } catch (IOException e) { throw new RuntimeException(e); } } - private static Map addPayloadAndIdToParams(Map params, Object payload, String id) { + static Map addPayloadAndIdToParams(Map params, Object payload, String id) { return Util.merge(params, Util.map("payload", payload, "id", id)); } - private static void insertDocuments() { + private static void insertDocuments() throws JsonProcessingException { Map params = createDefaultProcedureParametersWithPayloadAndId( "{\"procedurePackage\":\"es\",\"procedureName\":\"get\",\"procedureDescription\":\"perform a GET operation to ElasticSearch\"}", UUID.randomUUID().toString()); TestUtil.testCall( - db, "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload) yield value", params, r -> { + db, + "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload, $config) yield value", + params, + r -> { Object created = extractValueFromResponse(r, "$.result"); assertEquals("created", created); }); @@ -148,38 +124,31 @@ private static void insertDocuments() { "{\"procedurePackage\":\"es\",\"procedureName\":\"post\",\"procedureDescription\":\"perform a POST operation to ElasticSearch\"}", UUID.randomUUID().toString()); TestUtil.testCall( - db, "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload) yield value", params, r -> { + db, + "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload, $config) yield value", + params, + r -> { Object created = extractValueFromResponse(r, "$.result"); assertEquals("created", created); }); params = createDefaultProcedureParametersWithPayloadAndId(DOCUMENT, ES_ID); TestUtil.testCall( - db, "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload) yield value", params, r -> { + db, + "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload, $config) yield value", + params, + r -> { Object created = extractValueFromResponse(r, "$.result"); assertEquals("created", created); }); } - private static Object extractValueFromResponse(Map response, String jsonPath) { + static Object extractValueFromResponse(Map response, String jsonPath) { Object jsonResponse = response.get("value"); assertNotNull(jsonResponse); String json = JsonPath.parse(jsonResponse).jsonString(); - Object value = JsonPath.parse(json, JSON_PATH_CONFIG).read(jsonPath); - - return value; - } - - @Test - public void testStats() throws Exception { - TestUtil.testCall(db, "CALL apoc.es.stats($host)", defaultParams, commonEsStatsConsumer()); - } - - @Test - public void testStatsWithAuthHeader() { - TestUtil.testCall( - db, "CALL apoc.es.stats($host, {headers: $headers})", paramsWithBasicAuth, commonEsStatsConsumer()); + return JsonPath.parse(json, JSON_PATH_CONFIG).read(jsonPath); } /** @@ -192,45 +161,31 @@ public void testStatsWithAuthHeader() { public void testGetWithQueryNull() throws Exception { TestUtil.testCall( db, - "CALL apoc.es.get($host,$index,$type,$id,null,null) yield value", - defaultParams, - commonEsGetConsumer()); - } - - @Test - public void testProceduresWithUrl() { - TestUtil.testCall(db, "CALL apoc.es.stats($url)", defaultParams, commonEsStatsConsumer()); - - TestUtil.testCall( - db, - "CALL apoc.es.get($url,$index,$type,$id,null,null) yield value", - defaultParams, + "CALL apoc.es.get($host,$index,$type,$id,null,null,$config) yield value", + paramsWithBasicAuth, commonEsGetConsumer()); } @Test public void testProceduresWithUrlAndHeaders() { - TestUtil.testCall( - db, "CALL apoc.es.stats($url, {headers: $headers})", paramsWithBasicAuth, commonEsStatsConsumer()); + TestUtil.testCall(db, "CALL apoc.es.stats($url, $config)", paramsWithBasicAuth, commonEsStatsConsumer()); TestUtil.testCall( db, - "CALL apoc.es.get($url,$index,$type,$id,null,null) yield value", + "CALL apoc.es.get($url,$index,$type,$id,null,null,$config) yield value", paramsWithBasicAuth, commonEsGetConsumer()); } - @Test - public void testGetRowProcedure() { - Map params = Map.of("url", HTTP_URL_ADDRESS, "suffix", getRawProcedureUrl(ES_ID)); - - TestUtil.testCall(db, "CALL apoc.es.getRaw($url,$suffix, null)", params, commonEsGetConsumer()); - } - @Test public void testGetRowProcedureWithAuthHeader() { Map params = Map.of( - "url", elastic.getHttpHostAddress(), "suffix", getRawProcedureUrl(ES_ID), "headers", basicAuthHeader); + "url", + elastic.getHttpHostAddress(), + "suffix", + getRawProcedureUrl(ES_ID, getEsType()), + "headers", + basicAuthHeader); TestUtil.testCall( db, "CALL apoc.es.getRaw($url, $suffix, null, {headers: $headers})", params, commonEsGetConsumer()); @@ -245,54 +200,13 @@ public void testProceduresWithUrlKeyConfOverridingGenericUrlConf() { TestUtil.testCall( db, - "CALL apoc.es.get('customKey',$index,$type,$id,null,null) yield value", - defaultParams, + "CALL apoc.es.get('customKey',$index,$type,$id,null,null, $config) yield value", + paramsWithBasicAuth, commonEsGetConsumer()); apocConfig().getConfig().clearProperty(URL_CONF); } - @Test - public void testProceduresWithUrlKeyConf() { - apocConfig().setProperty("apoc.es.myUrlKey.url", HTTP_URL_ADDRESS); - - TestUtil.testCall(db, "CALL apoc.es.stats('myUrlKey')", commonEsStatsConsumer()); - - TestUtil.testCall( - db, - "CALL apoc.es.get('myUrlKey',$index,$type,$id,null,null) yield value", - defaultParams, - commonEsGetConsumer()); - } - - @Test - public void testProceduresWithHostKeyConf() { - apocConfig().setProperty("apoc.es.myHostKey.host", HTTP_HOST_ADDRESS); - - TestUtil.testCall(db, "CALL apoc.es.stats('myHostKey')", commonEsStatsConsumer()); - - TestUtil.testCall( - db, - "CALL apoc.es.get('myHostKey',$index,$type,$id,null,null) yield value", - defaultParams, - commonEsGetConsumer()); - } - - /** - * Simple get request for document retrieval but we also send multiple commands (as a Map) to ES - * http://localhost:9200/test-index/test-type/4fa40c40-db89-4761-b6a3-75f0015db059?_source_includes=name&_source_excludes=description - * - * @throws Exception - */ - @Test - public void testGetWithQueryAsMapMultipleParams() throws Exception { - TestUtil.testCall( - db, - "CALL apoc.es.get($host,$index,$type,$id,{_source_includes:'name',_source_excludes:'description'},null) yield value", - defaultParams, - commonEsGetConsumer()); - } - /** * Simple get request for document retrieval but we also send a single command (as a Map) to ES * http://localhost:9200/test-index/test-type/4fa40c40-db89-4761-b6a3-75f0015db059?_source_includes=name @@ -303,13 +217,13 @@ public void testGetWithQueryAsMapMultipleParams() throws Exception { public void testGetWithQueryAsMapSingleParam() throws Exception { TestUtil.testCall( db, - "CALL apoc.es.get($host,$index,$type,$id,{_source_includes:'name'},null) yield value", - defaultParams, + "CALL apoc.es.get($host,$index,$type,$id,{_source_includes:'name'},null, $config) yield value", + paramsWithBasicAuth, commonEsGetConsumer()); } /** - * Simple get request for document retrieval but we also send multiple commands (as a string) to ES + * Simple get request for document retrieval, but we also send multiple commands (as a string) to ES * http://localhost:9200/test-index/test-type/4fa40c40-db89-4761-b6a3-75f0015db059?_source_includes=name&_source_excludes=description * * @throws Exception @@ -318,8 +232,8 @@ public void testGetWithQueryAsMapSingleParam() throws Exception { public void testGetWithQueryAsStringMultipleParams() throws Exception { TestUtil.testCall( db, - "CALL apoc.es.get($host,$index,$type,$id,'_source_includes=name&_source_excludes=description',null) yield value", - defaultParams, + "CALL apoc.es.get($host,$index,$type,$id,'_source_includes=name&_source_excludes=description',null, $config) yield value", + paramsWithBasicAuth, commonEsGetConsumer()); } @@ -330,11 +244,11 @@ public void testGetWithQueryAsStringMultipleParams() throws Exception { * @throws Exception */ @Test - public void testGetWithQueryAsStringSingleParam() throws Exception { + public void testGetWithHeaderAndQueryAsStringSingleParam() throws Exception { TestUtil.testCall( db, - "CALL apoc.es.get($host,$index,$type,$id,'_source_includes=name',null) yield value", - defaultParams, + "CALL apoc.es.get($host,$index,$type,$id,'_source_includes=name',null, $config) yield value", + paramsWithBasicAuth, commonEsGetConsumer()); } @@ -344,30 +258,23 @@ public void testGetWithQueryAsStringSingleParam() throws Exception { */ @Test public void testSearchWithQueryNull() throws Exception { - TestUtil.testCall(db, "CALL apoc.es.query($host,$index,$type,null,null) yield value", defaultParams, r -> { - Object hits = extractValueFromResponse(r, "$.hits.hits"); - assertEquals(3, ((List) hits).size()); - }); + TestUtil.testCall( + db, "CALL apoc.es.query($host,$index,$type,null,null, $config) yield value", paramsWithBasicAuth, r -> { + Object hits = extractValueFromResponse(r, "$.hits.hits"); + assertEquals(3, ((List) hits).size()); + }); } - /** - * We want to search our document by name --> /test-index/test-type/_search?q=name:Neo4j - * This test uses a plain string to query ES - */ @Test - public void testSearchWithQueryAsAString() throws Exception { - TestUtil.testCall( - db, "CALL apoc.es.query($host,$index,$type,'q=name:Neo4j',null) yield value", defaultParams, r -> { - Object name = extractValueFromResponse(r, "$.hits.hits[0]._source.name"); - assertEquals("Neo4j", name); - }); + public void testStatsWithAuthHeader() { + TestUtil.testCall(db, "CALL apoc.es.stats($host, $config)", paramsWithBasicAuth, commonEsStatsConsumer()); } @Test public void testSearchWithQueryAsAStringAndHeader() throws Exception { TestUtil.testCall( db, - "CALL apoc.es.query($host, $index, $type, 'q=name:Neo4j', null, {headers: $headers}) yield value", + "CALL apoc.es.query($host, $index, $type, 'q=name:Neo4j', null, $config) yield value", paramsWithBasicAuth, r -> { Object name = extractValueFromResponse(r, "$.hits.hits[0]._source.name"); @@ -382,7 +289,10 @@ public void testSearchWithQueryAsAStringAndHeader() throws Exception { @Test public void testFullSearchWithQueryAsAString() throws Exception { TestUtil.testCall( - db, "CALL apoc.es.query($host,$index,$type,'q=name:*',null) yield value", defaultParams, r -> { + db, + "CALL apoc.es.query($host,$index,$type,'q=name:*',null, $config) yield value", + paramsWithBasicAuth, + r -> { Object name = extractValueFromResponse(r, "$.hits.hits[0]._source.name"); assertEquals("Neo4j", name); }); @@ -396,8 +306,8 @@ public void testFullSearchWithQueryAsAString() throws Exception { public void testFullSearchWithQueryAsAStringWithEquals() throws Exception { TestUtil.testCall( db, - "CALL apoc.es.query($host,$index,$type,'q=procedureName:get',null) yield value", - defaultParams, + "CALL apoc.es.query($host,$index,$type,'q=procedureName:get',null, $config) yield value", + paramsWithBasicAuth, r -> { Object name = extractValueFromResponse(r, "$.hits.hits[0]._source.procedureName"); assertEquals("get", name); @@ -412,8 +322,8 @@ public void testFullSearchWithQueryAsAStringWithEquals() throws Exception { public void testFullSearchWithOtherParametersAsAString() throws Exception { TestUtil.testCall( db, - "CALL apoc.es.query($host,$index,$type,'size=1&scroll=1m&_source=true&q=procedureName:get',null) yield value", - defaultParams, + "CALL apoc.es.query($host,$index,$type,'size=1&scroll=1m&_source=true&q=procedureName:get',null, $config) yield value", + paramsWithBasicAuth, r -> { Object hits = extractValueFromResponse(r, "$.hits.hits"); assertEquals(1, ((List) hits).size()); @@ -436,12 +346,15 @@ public void testPutUpdateDocument() throws IOException { Map params = createDefaultProcedureParametersWithPayloadAndId(JsonUtil.OBJECT_MAPPER.writeValueAsString(doc), ES_ID); TestUtil.testCall( - db, "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload) yield value", params, r -> { + db, + "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload, $config) yield value", + params, + r -> { Object updated = extractValueFromResponse(r, "$.result"); assertEquals("updated", updated); }); - TestUtil.testCall(db, "CALL apoc.es.get($host,$index,$type,$id,null,null) yield value", params, r -> { + TestUtil.testCall(db, "CALL apoc.es.get($host,$index,$type,$id,null,null, $config) yield value", params, r -> { Object tag = extractValueFromResponse(r, "$._source.tags[0]"); assertEquals("awesome", tag); }); @@ -456,7 +369,7 @@ public void testPutUpdateDocumentWithAuthHeader() throws IOException { Map params = addPayloadAndIdToParams(paramsWithBasicAuth, doc, ES_ID); TestUtil.testCall( db, - "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload, {headers: $headers}) yield value", + "CALL apoc.es.put($host,$index,$type,$id,'refresh=true',$payload, $config) yield value", params, r -> { Object result = extractValueFromResponse(r, "$.result"); @@ -464,10 +377,7 @@ public void testPutUpdateDocumentWithAuthHeader() throws IOException { }); TestUtil.testCall( - db, - "CALL apoc.es.get($host, $index, $type, $id, null, null, {headers: $headers}) yield value", - params, - r -> { + db, "CALL apoc.es.get($host, $index, $type, $id, null, null, $config) yield value", params, r -> { Object actualTags = extractValueFromResponse(r, "$._source.tags[0]"); assertEquals(tags, actualTags); }); @@ -476,7 +386,7 @@ public void testPutUpdateDocumentWithAuthHeader() throws IOException { @Test public void testPostRawCreateDocument() throws IOException { String index = UUID.randomUUID().toString(); - String type = UUID.randomUUID().toString(); + String type = getEsType(); String id = UUID.randomUUID().toString(); Map payload = JsonUtil.OBJECT_MAPPER.readValue("{\"ajeje\":\"Brazorf\"}", Map.class); Map params = Util.map( @@ -516,7 +426,7 @@ public void testPostRawCreateDocument() throws IOException { @Test public void testPostCreateDocumentWithAuthHeader() throws IOException { String index = UUID.randomUUID().toString(); - String type = UUID.randomUUID().toString(); + String type = getEsType(); Map payload = JsonUtil.OBJECT_MAPPER.readValue("{\"ajeje\":\"Brazorf\"}", Map.class); Map params = Util.map( "host", @@ -529,13 +439,13 @@ public void testPostCreateDocumentWithAuthHeader() throws IOException { payload, "suffix", index, - "headers", - basicAuthHeader); + "config", + map("headers", basicAuthHeader)); AtomicReference id = new AtomicReference<>(); TestUtil.testCall( db, - "CALL apoc.es.post($host,$index,$type,'refresh=true', $payload, {headers: $headers}) yield value", + "CALL apoc.es.post($host,$index,$type,'refresh=true', $payload, $config) yield value", params, r -> { Object result = extractValueFromResponse(r, "$.result"); @@ -547,117 +457,50 @@ public void testPostCreateDocumentWithAuthHeader() throws IOException { params.put("id", id.get()); TestUtil.testCall( - db, - "CALL apoc.es.get($host, $index, $type, $id, null, null, {headers: $headers}) yield value", - params, - r -> { + db, "CALL apoc.es.get($host, $index, $type, $id, null, null, $config) yield value", params, r -> { Object actual = extractValueFromResponse(r, "$._source.ajeje"); assertEquals("Brazorf", actual); }); - TestUtil.testCall( - db, - "CALL apoc.es.delete($host, $index, $type, $id, 'refresh=true', {headers: $headers})", - params, - r -> { - Object result = extractValueFromResponse(r, "$.result"); - assertEquals("deleted", result); - }); + TestUtil.testCall(db, "CALL apoc.es.delete($host, $index, $type, $id, 'refresh=true', $config)", params, r -> { + Object result = extractValueFromResponse(r, "$.result"); + assertEquals("deleted", result); + }); } /** - * We want to to search our document by name --> /test-index/test-type/_search?q=name:Neo4j + * We want to search our document by name --> /test-index/test-type/_search?q=name:Neo4j * This test uses a Map to query ES */ @Test public void testSearchWithQueryAsAMap() { TestUtil.testCall( db, - "CALL apoc.es.query($host,$index,$type,null,{query: {match: {name: 'Neo4j'}}}) yield value", - defaultParams, + "CALL apoc.es.query($host,$index,$type,null,{query: {match: {name: 'Neo4j'}}}, $config) yield value", + paramsWithBasicAuth, r -> { Object name = extractValueFromResponse(r, "$.hits.hits[0]._source.name"); assertEquals("Neo4j", name); }); } - @Test - public void testGetQueryUrlShouldBeTheSameAsOldFormatting() { - String index = ES_INDEX; - String type = ES_TYPE; - String id = ES_ID; - Map query = new HashMap<>(); - query.put("name", "get"); - - String host = HOST; - String hostUrl = es.getElasticSearchUrl(host); - - String queryUrl = hostUrl - + String.format( - "/%s/%s/%s?%s", - index == null ? "_all" : index, - type == null ? "_all" : type, - id == null ? "" : id, - es.toQueryParams(query)); - - assertEquals(queryUrl, es.getQueryUrl(host, index, type, id, query)); - } - - @Test - public void testGetQueryUrlShouldNotHaveTrailingQuestionMarkIfQueryIsNull() { - String index = ES_INDEX; - String type = ES_TYPE; - String id = ES_TYPE; - - String host = HOST; - String hostUrl = es.getElasticSearchUrl(host); - String queryUrl = hostUrl - + String.format( - "/%s/%s/%s?%s", - index == null ? "_all" : index, - type == null ? "_all" : type, - id == null ? "" : id, - es.toQueryParams(null)); - - // First we test the older version against the newest one - assertNotEquals(queryUrl, es.getQueryUrl(host, index, type, id, null)); - assertTrue(!es.getQueryUrl(host, index, type, id, null).endsWith("?")); - } - - @Test - public void testGetQueryUrlShouldNotHaveTrailingQuestionMarkIfQueryIsEmpty() { - String index = ES_INDEX; - String type = ES_TYPE; - String id = ES_ID; - - String host = HOST; - String hostUrl = es.getElasticSearchUrl(host); - String queryUrl = hostUrl - + String.format( - "/%s/%s/%s?%s", - index == null ? "_all" : index, - type == null ? "_all" : type, - id == null ? "" : id, - es.toQueryParams(new HashMap())); - - // First we test the older version against the newest one - assertNotEquals(queryUrl, es.getQueryUrl(host, index, type, id, new HashMap())); - assertTrue(!es.getQueryUrl(host, index, type, id, new HashMap()) - .endsWith("?")); - } - - private static Consumer> commonEsGetConsumer() { + static Consumer> commonEsGetConsumer() { return r -> { Object name = extractValueFromResponse(r, "$._source.name"); assertEquals("Neo4j", name); }; } - private static Consumer> commonEsStatsConsumer() { + static Consumer> commonEsStatsConsumer() { + return commonEsStatsConsumer(3); + } + + static Consumer> commonEsStatsConsumer(int expectedNumOfDocs) { return r -> { assertNotNull(r.get("value")); + Object numOfDocs = extractValueFromResponse(r, "$._all.total.docs.count"); - assertEquals(3, numOfDocs); + assertEquals(expectedNumOfDocs, numOfDocs); }; } } diff --git a/full-it/src/test/java/apoc/full/it/es/ElasticVersionEightTest.java b/full-it/src/test/java/apoc/full/it/es/ElasticVersionEightTest.java new file mode 100644 index 0000000000..791197406d --- /dev/null +++ b/full-it/src/test/java/apoc/full/it/es/ElasticVersionEightTest.java @@ -0,0 +1,88 @@ +package apoc.full.it.es; + +import static apoc.es.ElasticSearchConfig.VERSION_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import apoc.es.ElasticSearchHandler; +import apoc.util.TestUtil; +import apoc.util.Util; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ElasticVersionEightTest extends ElasticSearchTest { + public static final String ES_TYPE = "_doc"; + + @BeforeClass + public static void setUp() throws Exception { + Map config = + Map.of("headers", basicAuthHeader, VERSION_KEY, ElasticSearchHandler.Version.EIGHT.name()); + Map params = Util.map("index", ES_INDEX, "id", ES_ID, "type", ES_TYPE, "config", config); + + String tag = "8.12.1"; + Map envMap = Map.of( + "xpack.security.http.ssl.enabled", "false", + "cluster.routing.allocation.disk.threshold_enabled", "false"); + + getElasticContainer(tag, envMap, params); + } + + @Override + String getEsType() { + return ES_TYPE; + } + + @Test + public void testCreateIndexAPI() { + TestUtil.testCall( + db, "CALL apoc.es.put($host,'my-index-000001',null,null,null,null,$config)", paramsWithBasicAuth, r -> { + Object actual = ((Map) r.get("value")).get("index"); + assertEquals("my-index-000001", actual); + }); + } + + @Test + public void testGetIndexAPI() { + TestUtil.testCall( + db, + "CALL apoc.es.get($host,$index,null,null,null,null,$config) yield value", + paramsWithBasicAuth, + r -> { + Set valueKeys = ((Map) r.get("value")).keySet(); + assertEquals(Set.of(ES_INDEX), valueKeys); + }); + } + + @Test + public void testSearchWithQueryAsPayload() { + TestUtil.testCall( + db, + "CALL apoc.es.query($host, $index, null, 'pretty', {`_source`: {includes: ['name']}}, $config) yield value", + paramsWithBasicAuth, + this::searchQueryPayloadAssertions); + } + + @Test + public void testSearchWithQueryAsPayloadAndWithoutIndex() { + TestUtil.testCall( + db, + "CALL apoc.es.query($host, null, null, 'pretty', {`_source`: {includes: ['name']}}, $config) yield value", + paramsWithBasicAuth, + this::searchQueryPayloadAssertions); + } + + private void searchQueryPayloadAssertions(Map r) { + List values = (List) extractValueFromResponse(r, "$.hits.hits"); + assertEquals(3, values.size()); + + values.forEach(item -> { + Map source = (Map) item.get("_source"); + + assertTrue( + "Actual _source is: " + source, source.equals(Map.of()) || source.equals(Map.of("name", "Neo4j"))); + }); + } +} diff --git a/full-it/src/test/java/apoc/full/it/es/ElasticVersionSevenTest.java b/full-it/src/test/java/apoc/full/it/es/ElasticVersionSevenTest.java new file mode 100644 index 0000000000..d55ec7c89e --- /dev/null +++ b/full-it/src/test/java/apoc/full/it/es/ElasticVersionSevenTest.java @@ -0,0 +1,202 @@ +package apoc.full.it.es; + +import static apoc.ApocConfig.apocConfig; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import apoc.es.ElasticSearchHandler; +import apoc.util.TestUtil; +import apoc.util.Util; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ElasticVersionSevenTest extends ElasticSearchTest { + + public static final String ES_TYPE = UUID.randomUUID().toString(); + private static final String HOST = "localhost"; + public static final ElasticSearchHandler DEFAULT_HANDLER = ElasticSearchHandler.Version.DEFAULT.get(); + + private static final Map defaultParams = Util.map("index", ES_INDEX, "type", ES_TYPE, "id", ES_ID); + + @BeforeClass + public static void setUp() throws Exception { + + Map config = Map.of("headers", basicAuthHeader); + Map params = new HashMap<>(defaultParams); + params.put("config", config); + + String tag = "7.9.2"; + getElasticContainer(tag, Map.of(), params); + + String httpHostAddress = elastic.getHttpHostAddress(); + HTTP_HOST_ADDRESS = String.format("elastic:%s@%s", password, httpHostAddress); + + HTTP_URL_ADDRESS = "http://" + HTTP_HOST_ADDRESS; + + defaultParams.put("host", HTTP_HOST_ADDRESS); + defaultParams.put("url", HTTP_URL_ADDRESS); + } + + @Override + String getEsType() { + return ES_TYPE; + } + + /* + Tests without basic auth header + */ + + @Test + public void testGetRowProcedure() { + Map params = Map.of("url", HTTP_URL_ADDRESS, "suffix", getRawProcedureUrl(ES_ID, ES_TYPE)); + + TestUtil.testCall(db, "CALL apoc.es.getRaw($url,$suffix, null)", params, commonEsGetConsumer()); + } + + @Test + public void testStats() throws Exception { + TestUtil.testCall(db, "CALL apoc.es.stats($host)", defaultParams, commonEsStatsConsumer()); + } + + @Test + public void testProceduresWithUrl() { + TestUtil.testCall(db, "CALL apoc.es.stats($url)", defaultParams, commonEsStatsConsumer()); + + TestUtil.testCall( + db, + "CALL apoc.es.get($url,$index,$type,$id,null,null) yield value", + defaultParams, + commonEsGetConsumer()); + } + + @Test + public void testProceduresWithUrlKeyConf() { + apocConfig().setProperty("apoc.es.myUrlKey.url", HTTP_URL_ADDRESS); + + TestUtil.testCall(db, "CALL apoc.es.stats('myUrlKey')", commonEsStatsConsumer()); + + TestUtil.testCall( + db, + "CALL apoc.es.get('myUrlKey',$index,$type,$id,null,null, $config) yield value", + paramsWithBasicAuth, + commonEsGetConsumer()); + } + + @Test + public void testProceduresWithHostKeyConf() { + apocConfig().setProperty("apoc.es.myHostKey.host", HTTP_HOST_ADDRESS); + + TestUtil.testCall(db, "CALL apoc.es.stats('myHostKey')", commonEsStatsConsumer()); + + TestUtil.testCall( + db, + "CALL apoc.es.get('myHostKey',$index,$type,$id,null,null, $config) yield value", + paramsWithBasicAuth, + commonEsGetConsumer()); + } + + @Test + public void testGetWithQueryAsStringSingleParam() { + TestUtil.testCall( + db, + "CALL apoc.es.get($host,$index,$type,$id,'_source_includes=name',null, {}) yield value", + defaultParams, + commonEsGetConsumer()); + } + + /** + * We want to search our document by name --> /test-index/test-type/_search?q=name:Neo4j + * This test uses a plain string to query ES + */ + @Test + public void testSearchWithQueryAsAString() { + TestUtil.testCall( + db, "CALL apoc.es.query($host,$index,$type,'q=name:Neo4j',null) yield value", defaultParams, r -> { + Object name = extractValueFromResponse(r, "$.hits.hits[0]._source.name"); + assertEquals("Neo4j", name); + }); + } + + @Test + public void testGetQueryUrlShouldBeTheSameAsOldFormatting() { + String index = ES_INDEX; + String type = ES_TYPE; + String id = ES_ID; + Map query = new HashMap<>(); + query.put("name", "get"); + + String host = HOST; + String hostUrl = DEFAULT_HANDLER.getElasticSearchUrl(host); + + String queryUrl = hostUrl + + String.format( + "/%s/%s/%s?%s", + index == null ? "_all" : index, + type == null ? "_all" : type, + id == null ? "" : id, + DEFAULT_HANDLER.toQueryParams(query)); + + assertEquals(queryUrl, DEFAULT_HANDLER.getQueryUrl(host, index, type, id, query)); + } + + @Test + public void testGetQueryUrlShouldNotHaveTrailingQuestionMarkIfQueryIsNull() { + String index = ES_INDEX; + String type = ES_TYPE; + String id = ES_TYPE; + + String host = HOST; + String hostUrl = DEFAULT_HANDLER.getElasticSearchUrl(host); + String queryUrl = hostUrl + + String.format( + "/%s/%s/%s?%s", + index, type == null ? "_all" : type, id == null ? "" : id, DEFAULT_HANDLER.toQueryParams(null)); + + // First we test the older version against the newest one + assertNotEquals(queryUrl, DEFAULT_HANDLER.getQueryUrl(host, index, type, id, null)); + assertFalse(DEFAULT_HANDLER.getQueryUrl(host, index, type, id, null).endsWith("?")); + } + + @Test + public void testGetQueryUrlShouldNotHaveTrailingQuestionMarkIfQueryIsEmpty() { + String index = ES_INDEX; + String type = ES_TYPE; + String id = ES_ID; + + String host = HOST; + String hostUrl = DEFAULT_HANDLER.getElasticSearchUrl(host); + String queryUrl = hostUrl + + String.format( + "/%s/%s/%s?%s", + index == null ? "_all" : index, + type == null ? "_all" : type, + id == null ? "" : id, + DEFAULT_HANDLER.toQueryParams(new HashMap())); + + // First we test the older version against the newest one + assertNotEquals(queryUrl, DEFAULT_HANDLER.getQueryUrl(host, index, type, id, new HashMap())); + assertTrue(!DEFAULT_HANDLER + .getQueryUrl(host, index, type, id, new HashMap()) + .endsWith("?")); + } + + /** + * Simple get request for document retrieval but we also send multiple commands (as a Map) to ES + * http://localhost:9200/test-index/test-type/4fa40c40-db89-4761-b6a3-75f0015db059?_source_includes=name&_source_excludes=description + * + * @throws Exception + */ + @Test + public void testGetWithQueryAsMapMultipleParams() throws Exception { + TestUtil.testCall( + db, + "CALL apoc.es.get($host,$index,$type,$id,{_source_includes:'name',_source_excludes:'description'},null) yield value", + defaultParams, + commonEsGetConsumer()); + } +} diff --git a/full/src/main/java/apoc/es/ElasticSearch.java b/full/src/main/java/apoc/es/ElasticSearch.java index 377862b850..a51941d64e 100644 --- a/full/src/main/java/apoc/es/ElasticSearch.java +++ b/full/src/main/java/apoc/es/ElasticSearch.java @@ -1,35 +1,11 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package apoc.es; -import static apoc.util.MapUtil.map; - import apoc.Extended; import apoc.load.LoadJson; import apoc.result.MapResult; -import apoc.util.UrlResolver; import apoc.util.Util; import java.util.Collections; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.neo4j.procedure.Description; import org.neo4j.procedure.Name; @@ -42,82 +18,6 @@ @Extended public class ElasticSearch { - private static final String fullQueryTemplate = "/%s/%s/%s?%s"; - - // /{index}/{type}/_search?{query} - private static final String fullQuerySearchTemplate = "/%s/%s/_search?%s"; - - /** - * With this pattern we can match both key:value params and key=value params - */ - private static final Pattern KEY_VALUE = Pattern.compile("(.*)(:|=)(.*)"); - - protected String getElasticSearchUrl(String hostOrKey) { - return new UrlResolver("http", "localhost", 9200).getUrl("es", hostOrKey); - } - - /** - * Get the full Elasticsearch url - * - * @param hostOrKey - * @param index - * @param type - * @param id - * @param query - * @return - */ - protected String getQueryUrl(String hostOrKey, String index, String type, String id, Object query) { - return getElasticSearchUrl(hostOrKey) + formatQueryUrl(index, type, id, query); - } - - /** - * @param hostOrKey - * @param index - * @param type - * @param query - * @return - */ - protected String getSearchQueryUrl(String hostOrKey, String index, String type, Object query) { - return getElasticSearchUrl(hostOrKey) + formatSearchQueryUrl(index, type, query); - } - - /** - * @param index - * @param type - * @param query - * @return - */ - private String formatSearchQueryUrl(String index, String type, Object query) { - String queryUrl = String.format( - fullQuerySearchTemplate, - index == null ? "_all" : index, - type == null ? "_all" : type, - toQueryParams(query)); - - return queryUrl.endsWith("?") ? queryUrl.substring(0, queryUrl.length() - 1) : queryUrl; - } - - /** - * Format the query url template according to the parameters. - * The format will be /{index}/{type}/{id}?{query} if query is not empty (or null) otherwise the format will be /{index}/{type}/{id} - * - * @param index - * @param type - * @param id - * @param query - * @return - */ - private String formatQueryUrl(String index, String type, String id, Object query) { - String queryUrl = String.format( - fullQueryTemplate, - index == null ? "_all" : index, - type == null ? "_all" : type, - id == null ? "" : id, - toQueryParams(query)); - - return queryUrl.endsWith("?") ? queryUrl.substring(0, queryUrl.length() - 1) : queryUrl; - } - /** * @param payload * @return @@ -128,42 +28,18 @@ protected String toPayload(Object payload) { return payload.toString(); } - /** - * @param query - * @return - */ - protected String toQueryParams(Object query) { - if (query == null) return ""; - if (query instanceof Map) { - Map map = (Map) query; - if (map.isEmpty()) return ""; - return map.entrySet().stream() - .map(e -> e.getKey() + "=" - + Util.encodeUrlComponent(e.getValue().toString())) - .collect(Collectors.joining("&")); - } else { - // We have to encode only the values not the keys - return Pattern.compile("&") - .splitAsStream(query.toString()) - .map(KEY_VALUE::matcher) - .filter(Matcher::matches) - .map(matcher -> matcher.group(1) + matcher.group(2) + Util.encodeUrlComponent(matcher.group(3))) - .collect(Collectors.joining("&")); - } - } - @Procedure @Description("apoc.es.stats(host-or-key,$config) - elastic search statistics") public Stream stats( - @Name("hostOrKey") String hostOrKey, - @Name(value = "config", defaultValue = "{}") Map config) { - String url = getElasticSearchUrl(hostOrKey); - return loadJsonStream(url + "/_stats", new ElasticSearchConfig(config), null); + @Name("host") String hostOrKey, @Name(value = "config", defaultValue = "{}") Map config) { + ElasticSearchConfig conf = new ElasticSearchConfig(config); + String url = conf.getVersion().getElasticSearchUrl(hostOrKey); + return loadJsonStream(url + "/_stats", conf, null); } @Procedure @Description( - "apoc.es.get(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null) yield value - perform a GET operation on elastic search") + "apoc.es.get(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - perform a GET operation on elastic search") public Stream get( @Name("hostOrKey") String hostOrKey, @Name("index") String index, @@ -172,13 +48,14 @@ public Stream get( @Name("query") Object query, @Name("payload") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { - return loadJsonStream( - getQueryUrl(hostOrKey, index, type, id, query), new ElasticSearchConfig(config), toPayload(payload)); + ElasticSearchConfig conf = new ElasticSearchConfig(config); + String queryUrl = conf.getVersion().getQueryUrl(hostOrKey, index, type, id, query); // .replace("mytype/", ""); + return loadJsonStream(queryUrl, conf, toPayload(payload)); } @Procedure @Description( - "apoc.es.query(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null) yield value - perform a SEARCH operation on elastic search") + "apoc.es.query(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - perform a SEARCH operation on elastic search") public Stream query( @Name("hostOrKey") String hostOrKey, @Name("index") String index, @@ -186,37 +63,42 @@ public Stream query( @Name("query") Object query, @Name("payload") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { - return loadJsonStream( - getSearchQueryUrl(hostOrKey, index, type, query), new ElasticSearchConfig(config), toPayload(payload)); + ElasticSearchConfig conf = new ElasticSearchConfig(config); + String searchQueryUrl = + conf.getVersion().getSearchQueryUrl(hostOrKey, index, type, query); // .replace("mytype/", ""); + + return loadJsonStream(searchQueryUrl, conf, toPayload(payload)); } @Procedure @Description( - "apoc.es.getRaw(host-or-key,path,payload-or-null) yield value - perform a raw GET operation on elastic search") + "apoc.es.getRaw(host-or-key,path,payload-or-null,$config) yield value - perform a raw GET operation on elastic search") public Stream getRaw( @Name("hostOrKey") String hostOrKey, @Name("path") String suffix, @Name("payload") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { - String url = getElasticSearchUrl(hostOrKey); - return loadJsonStream(url + "/" + suffix, new ElasticSearchConfig(config), toPayload(payload)); + ElasticSearchConfig conf = new ElasticSearchConfig(config); + String url = conf.getVersion().getElasticSearchUrl(hostOrKey); + return loadJsonStream(url + "/" + suffix, conf, toPayload(payload)); } @Procedure @Description( - "apoc.es.postRaw(host-or-key,path,payload-or-null) yield value - perform a raw POST operation on elastic search") + "apoc.es.postRaw(host-or-key,path,payload-or-null,$config) yield value - perform a raw POST operation on elastic search") public Stream postRaw( @Name("hostOrKey") String hostOrKey, @Name("path") String suffix, @Name("payload") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { - String url = getElasticSearchUrl(hostOrKey); - return loadJsonStream(url + "/" + suffix, new ElasticSearchConfig(config, "POST"), toPayload(payload)); + ElasticSearchConfig conf = new ElasticSearchConfig(config, "POST"); + String url = conf.getVersion().getElasticSearchUrl(hostOrKey); + return loadJsonStream(url + "/" + suffix, conf, toPayload(payload)); } @Procedure @Description( - "apoc.es.post(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null) yield value - perform a POST operation on elastic search") + "apoc.es.post(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - perform a POST operation on elastic search") public Stream post( @Name("hostOrKey") String hostOrKey, @Name("index") String index, @@ -227,15 +109,14 @@ public Stream post( if (payload == null) { payload = Collections.emptyMap(); } - return loadJsonStream( - getQueryUrl(hostOrKey, index, type, null, query), - new ElasticSearchConfig(config, "POST"), - toPayload(payload)); + ElasticSearchConfig conf = new ElasticSearchConfig(config, "POST"); + String queryUrl = conf.getVersion().getQueryUrl(hostOrKey, index, type, null, query); + return loadJsonStream(queryUrl, conf, toPayload(payload)); } @Procedure @Description( - "apoc.es.put(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null) yield value - perform a PUT operation on elastic search") + "apoc.es.put(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - perform a PUT operation on elastic search") public Stream put( @Name("hostOrKey") String hostOrKey, @Name("index") String index, @@ -247,10 +128,10 @@ public Stream put( if (payload == null) { payload = Collections.emptyMap(); } - return loadJsonStream( - getQueryUrl(hostOrKey, index, type, id, query), - new ElasticSearchConfig(config, "PUT"), - toPayload(payload)); + + ElasticSearchConfig conf = new ElasticSearchConfig(config, "PUT"); + String queryUrl = conf.getVersion().getQueryUrl(hostOrKey, index, type, id, query); + return loadJsonStream(queryUrl, conf, toPayload(payload)); } @Procedure @@ -269,8 +150,9 @@ public Stream delete( Otherwise, an error `Cannot write to a URLConnection if doOutput=false - call setDoOutput(true)` will be thrown */ String payload = ""; - return loadJsonStream( - getQueryUrl(hostOrKey, index, type, id, query), new ElasticSearchConfig(config, "DELETE"), payload); + ElasticSearchConfig conf = new ElasticSearchConfig(config, "DELETE"); + String queryUrl = conf.getVersion().getQueryUrl(hostOrKey, index, type, id, query); + return loadJsonStream(queryUrl, conf, payload); } private Stream loadJsonStream( diff --git a/full/src/main/java/apoc/es/ElasticSearchConfig.java b/full/src/main/java/apoc/es/ElasticSearchConfig.java index 8c284dc6a0..c06bc0d108 100644 --- a/full/src/main/java/apoc/es/ElasticSearchConfig.java +++ b/full/src/main/java/apoc/es/ElasticSearchConfig.java @@ -18,14 +18,18 @@ */ package apoc.es; +import static apoc.es.ElasticSearchHandler.Version; + import java.util.Collections; import java.util.HashMap; import java.util.Map; public class ElasticSearchConfig { public static final String HEADERS_KEY = "headers"; + public static final String VERSION_KEY = "version"; private final Map headers; + private final ElasticSearchHandler version; public ElasticSearchConfig(Map config) { this(config, null); @@ -42,9 +46,16 @@ public ElasticSearchConfig(Map config, String httpMethod) { headerConf.putIfAbsent("method", httpMethod); } this.headers = headerConf; + + String versionConf = (String) config.getOrDefault(VERSION_KEY, Version.DEFAULT.name()); + this.version = Version.valueOf(versionConf).get(); } public Map getHeaders() { return headers; } + + public ElasticSearchHandler getVersion() { + return version; + } } diff --git a/full/src/main/java/apoc/es/ElasticSearchHandler.java b/full/src/main/java/apoc/es/ElasticSearchHandler.java new file mode 100644 index 0000000000..eb13096460 --- /dev/null +++ b/full/src/main/java/apoc/es/ElasticSearchHandler.java @@ -0,0 +1,145 @@ +package apoc.es; + +import apoc.util.UrlResolver; +import apoc.util.Util; +import java.util.Arrays; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; + +public abstract class ElasticSearchHandler { + + /** + * With this pattern we can match both key:value params and key=value params + */ + private static final Pattern KEY_VALUE = Pattern.compile("(.*)(:|=)(.*)"); + + public String getElasticSearchUrl(String hostOrKey) { + return new UrlResolver("http", "localhost", 9200).getUrl("es", hostOrKey); + } + + /** + * @param query + * @return + */ + public String toQueryParams(Object query) { + if (query == null) return ""; + if (query instanceof Map) { + Map map = (Map) query; + if (map.isEmpty()) return ""; + return map.entrySet().stream() + .map(e -> e.getKey() + "=" + + Util.encodeUrlComponent(e.getValue().toString())) + .collect(Collectors.joining("&")); + } else { + // We have to encode only the values not the keys + return Pattern.compile("&") + .splitAsStream(query.toString()) + .map(KEY_VALUE::matcher) + .filter(Matcher::matches) + .map(matcher -> matcher.group(1) + matcher.group(2) + Util.encodeUrlComponent(matcher.group(3))) + .collect(Collectors.joining("&")); + } + } + + /** + * Get the full Elasticsearch url + */ + public String getQueryUrl(String hostOrKey, String index, String type, String id, Object query) { + return getElasticSearchUrl(hostOrKey) + formatQueryUrl(index, type, id, query); + } + + /** + * Get the full Elasticsearch search url + */ + public String getSearchQueryUrl(String hostOrKey, String index, String type, Object query) { + return getElasticSearchUrl(hostOrKey) + formatSearchQueryUrl(index, type, query); + } + + /** + * Format the Search API url template according to the parameters. + */ + public abstract String formatSearchQueryUrl(String index, String type, Object query); + + /** + * Format the query url template according to the parameters. + * The format will be /{index}/{type}/{id}?{query} if query is not empty (or null) otherwise the format will be /{index}/{type}/{id} + */ + public abstract String formatQueryUrl(String index, String type, String id, Object query); + + public enum Version { + EIGHT(new Eight()), + DEFAULT(new Default()); + + private final ElasticSearchHandler handler; + + Version(ElasticSearchHandler handler) { + this.handler = handler; + } + + public ElasticSearchHandler get() { + return handler; + } + } + + public static class Eight extends ElasticSearchHandler { + + @Override + public String formatSearchQueryUrl(String index, String type, Object query) { + + String queryUrl = String.format("/%s/_search?%s", index == null ? "_all" : index, toQueryParams(query)); + + return removeTerminalQuote(queryUrl); + } + + @Override + public String formatQueryUrl(String index, String type, String id, Object query) { + + String queryUrl = Arrays.asList(index, type, id).stream() + .filter(StringUtils::isNotBlank) + .collect(Collectors.joining("/")); + + String queryParams = toQueryParams(query); + queryParams = "".equals(queryParams) ? "" : ("?" + queryParams); + + return "/" + queryUrl + queryParams; + } + } + + public static class Default extends ElasticSearchHandler { + + private final String fullQueryTemplate = "/%s/%s/%s?%s"; + private final String fullQuerySearchTemplate = "/%s/%s/_search?%s"; + + @Override + public String formatSearchQueryUrl(String index, String type, Object query) { + String queryUrl = String.format( + fullQuerySearchTemplate, + index == null ? "_all" : index, + type == null ? "_all" : type, + toQueryParams(query)); + + return removeTerminalQuote(queryUrl); + } + + @Override + public String formatQueryUrl(String index, String type, String id, Object query) { + String queryUrl = String.format( + fullQueryTemplate, + index == null ? "_all" : index, + type == null ? "_all" : type, + id == null ? "" : id, + toQueryParams(query)); + + return removeTerminalQuote(queryUrl); + } + } + + @NotNull + private static String removeTerminalQuote(String queryUrl) { + return queryUrl.endsWith("?") ? queryUrl.substring(0, queryUrl.length() - 1) : queryUrl; + } +}