Skip to content

Commit

Permalink
[NOID] Fixes #3360: Add support for newer Elasticsearch search api
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Dec 4, 2024
1 parent cdb798c commit 9b13481
Show file tree
Hide file tree
Showing 6 changed files with 603 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ include::example$generated-documentation/apoc.es.delete.adoc[]
|===
// end::elasticsearch[]

[NOTE]
====
It is currently not possible to query Elastic 8 via certificate,
but only disabling ssl with the configuration `"xpack.security.http.ssl.enabled=false"`, using the basic authentication via the header config (see `config parameter` below)
or (not recommended) disabling security via `xpack.security.enabled=false`
====


== Example

Expand Down Expand Up @@ -142,6 +149,8 @@ Config can be an optional *map*, which can have the following entries:
| headers | `Map` | {`content-type`: "application/json", `method`, "<httpMethod>"} | Contains a header map to add (or replace) the default one.
The `method: <httpMethod>` is needed by APOC to figure out under the hood, which http request method to pass.
That is, by default, it is `PUT` with the `apoc.es.put`, POST with the `apoc.es.post` and `apoc.es.postRaw`, and GET in other cases.
| version | `String` | `DEFAULT` | Can be `DEFAULT` and `EIGHT`, in order to change the RestAPI endpoint based on Elastic version.
See `Endpoint` table below.
|===


Expand All @@ -160,6 +169,48 @@ Content-Type: application/json
```


Some APIs in Elastic 8 can be called by the procedures without needing configuration `{version: 'EIGHT'}`,
for example the `apoc.es.stats`,
but for several APIs, it is necessary to set it, to handle the endpoint properly,
for example the `apoc.es.query`.

.Endpoint
[opts=header]
|===
| procedure(s) | with version: `DEFAULT` | with version: `EIGHT`
| `apoc.es.stats(host)` | <host>/_stats | same as `DEFAULT`
| `apoc.es.query(host, index, type, query, payload, $conf)` | <host>/<index param>/<type param>/_stats?<query param> | <host>/<index param>/_stats?<query param>
| `apoc.es.getRaw/apoc.es.postRaw(host, path, payload, $conf)` | `<host>/<path param>` | same as `DEFAULT`
| the others `apoc.es.<name>(host, index, type, id, query, payload, $conf)` procedures | `<host>/<index param>/<type param>/<id param>_stats?<query param>`
By default, the `<index param>` and `<id param>` will be populated as `_all`, while the `<id param>`, if not present, will be removed from the endpoint
| `<host>/<index param>/<type param>/<id param>_stats?<query param>`. Note that you only need to enter one of three values between `<index param>`,`<id param>` and `<type param>`, the others will eventually be excluded from the endpoint.

The type param is usually an underscore string indicating the type of the API, e.g. `_doc` or `_update` (while previously indicated https://www.elastic.co/guide/en/elasticsearch/reference/6.1/removal-of-types.html[the mapping types]).
This is to allow you to call, for example, https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html[this API]
|===


For example, by using the `apoc.es.query`, we can execute a Search API:
[source, cypher]
----
CALL apoc.es.query(<$host>, <$index>, <$type>, 'q=name:Neo4j', null, { version: 'EIGHT' })
----

Updates a document in Elastic 8 via https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html#docs-update[Update API]:

[source, cypher]
----
CALL apoc.es.put($host,'<indexName>','_doc','<idName>','refresh=true',{name: 'foo'}, {version: 'EIGHT'})
----

Call a https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-index[Create Index API] in elastic 8:

[source, cypher]
----
CALL apoc.es.put($host,'<indexName>', null, null, null, null, { version: 'EIGHT' })
----


=== Results

Results are stream of map in value.
11 changes: 11 additions & 0 deletions full/src/main/java/apoc/es/ElasticSearchConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import java.util.HashMap;
import java.util.Map;

import static apoc.es.ElasticSearchHandler.Version;

public class ElasticSearchConfig {
public static final String HEADERS_KEY = "headers";
public static final String VERSION_KEY = "version";

private final Map<String, Object> headers;
private final ElasticSearchHandler version;

public ElasticSearchConfig(Map<String, Object> config) {
this(config, null);
Expand All @@ -42,9 +46,16 @@ public ElasticSearchConfig(Map<String, Object> config, String httpMethod) {
headerConf.putIfAbsent("method", httpMethod);
}
this.headers = headerConf;

String versionConf = (String) config.getOrDefault(VERSION_KEY, Version.DEFAULT.name());
this.version = Version.valueOf(versionConf).get();
}

public Map<String, Object> getHeaders() {
return headers;
}

public ElasticSearchHandler getVersion() {
return version;
}
}
145 changes: 145 additions & 0 deletions full/src/main/java/apoc/es/ElasticSearchHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package apoc.es;


import apoc.util.UrlResolver;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public abstract class ElasticSearchHandler {

/**
* With this pattern we can match both key:value params and key=value params
*/
private final static Pattern KEY_VALUE = Pattern.compile("(.*)(:|=)(.*)");

protected String getElasticSearchUrl(String hostOrKey) {
return new UrlResolver("http", "localhost", 9200).getUrl("es", hostOrKey);
}

/**
* @param query
* @return
*/
protected String toQueryParams(Object query) {
if (query == null) return "";
if (query instanceof Map) {
Map<String, Object> map = (Map<String, Object>) query;
if (map.isEmpty()) return "";
return map.entrySet().stream().map(e -> e.getKey() + "=" + Util.encodeUrlComponent(e.getValue().toString())).collect(Collectors.joining("&"));
} else {
// We have to encode only the values not the keys
return Pattern.compile("&").splitAsStream(query.toString())
.map(KEY_VALUE::matcher)
.filter(Matcher::matches)
.map(matcher -> matcher.group(1) + matcher.group(2) + Util.encodeUrlComponent(matcher.group(3)))
.collect(Collectors.joining("&"));
}
}

/**
* Get the full Elasticsearch url
*/
protected String getQueryUrl(String hostOrKey, String index, String type, String id, Object query) {
return getElasticSearchUrl(hostOrKey) + formatQueryUrl(index, type, id, query);
}

/**
* Get the full Elasticsearch search url
*/
protected String getSearchQueryUrl(String hostOrKey, String index, String type, Object query) {
return getElasticSearchUrl(hostOrKey) + formatSearchQueryUrl(index, type, query);
}

/**
* Format the Search API url template according to the parameters.
*/
protected abstract String formatSearchQueryUrl(String index, String type, Object query);

/**
* Format the query url template according to the parameters.
* The format will be /{index}/{type}/{id}?{query} if query is not empty (or null) otherwise the format will be /{index}/{type}/{id}
*/
protected abstract String formatQueryUrl(String index, String type, String id, Object query);

enum Version {
EIGHT(new Eight()),
DEFAULT(new Default());

private final ElasticSearchHandler handler;
Version(ElasticSearchHandler handler) {
this.handler = handler;
}

public ElasticSearchHandler get() {
return handler;
}
}

static class Eight extends ElasticSearchHandler {

@Override
protected String formatSearchQueryUrl(String index, String type, Object query) {

String queryUrl = String.format( "/%s/_search?%s",
index == null ? "_all" : index,
toQueryParams(query));

return removeTerminalQuote(queryUrl);
}

@Override
protected String formatQueryUrl(String index, String type, String id, Object query) {

String queryUrl = Arrays.asList(index, type, id)
.stream()
.filter(StringUtils::isNotBlank)
.collect(Collectors.joining("/"));

String queryParams = toQueryParams(query);
queryParams = "".equals(queryParams)
? ""
: ("?" + queryParams);

return "/" + queryUrl + queryParams;
}
}

static class Default extends ElasticSearchHandler {

private final String fullQueryTemplate = "/%s/%s/%s?%s";
private final String fullQuerySearchTemplate = "/%s/%s/_search?%s";

@Override
protected String formatSearchQueryUrl(String index, String type, Object query) {
String queryUrl = String.format(fullQuerySearchTemplate,
index == null ? "_all" : index,
type == null ? "_all" : type,
toQueryParams(query));

return removeTerminalQuote(queryUrl);
}

@Override
protected String formatQueryUrl(String index, String type, String id, Object query) {
String queryUrl = String.format(fullQueryTemplate,
index == null ? "_all" : index,
type == null ? "_all" : type,
id == null ? "" : id,
toQueryParams(query));

return removeTerminalQuote(queryUrl);
}
}

@NotNull
private static String removeTerminalQuote(String queryUrl) {
return queryUrl.endsWith("?") ? queryUrl.substring(0, queryUrl.length() - 1) : queryUrl;
}
}
Loading

0 comments on commit 9b13481

Please sign in to comment.